This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4da93d0b60a fix(serialized_objects): handle both inlet and outlet in 
v1 to v2 convert (#49286)
4da93d0b60a is described below

commit 4da93d0b60a1adf310ec06929d6676360a639af7
Author: Wei Lee <[email protected]>
AuthorDate: Tue Apr 15 21:13:34 2025 +0800

    fix(serialized_objects): handle both inlet and outlet in v1 to v2 convert 
(#49286)
    
    * fix(serialized_objects): handle both inlet and outlet in v1 to v2 convert
    
    * fix-migration
    
    ---------
    
    Co-authored-by: vatsrahul1001 <[email protected]>
---
 .../airflow/serialization/serialized_objects.py    | 17 ++---
 .../unit/serialization/test_dag_serialization.py   | 72 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 11 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 2340c171970..8dd2b9294d2 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1840,7 +1840,8 @@ class SerializedDAG(DAG, BaseSerialization):
             return obj
 
         for old, new in dag_renames:
-            dag_dict[new] = dag_dict.pop(old)
+            if old in dag_dict:
+                dag_dict[new] = dag_dict.pop(old)
 
         if default_args := dag_dict.get("default_args"):
             for k in tasks_remove:
@@ -1903,15 +1904,15 @@ class SerializedDAG(DAG, BaseSerialization):
                 task_var.pop(k, None)
             for old, new in task_renames:
                 task_var[new] = task_var.pop(old)
-            for item in task_var.get("outlets", []):
+            for item in itertools.chain(*(task_var.get(key, []) for key in 
("inlets", "outlets"))):
+                original_item_type = item["__type"]
                 if isinstance(item, dict) and "__type" in item:
-                    item["__type"] = replace_dataset_in_str(item["__type"])
-            for item in task_var.get("inlets", []):
-                if isinstance(item, dict) and "__type" in item:
-                    item["__type"] = replace_dataset_in_str(item["__type"])
+                    item["__type"] = replace_dataset_in_str(original_item_type)
+
                 var_ = item["__var"]
-                var_["name"] = None
-                var_["group"] = None
+                if original_item_type == "dataset":
+                    var_["name"] = var_["uri"]
+                var_["group"] = "asset"
 
         # Set on the root TG
         dag_dict["task_group"]["group_display_name"] = ""
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index fb2f3d8f1f7..d3e605063d8 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -64,7 +64,7 @@ from airflow.models.xcom import XComModel
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.sensors.bash import BashSensor
-from airflow.sdk import teardown
+from airflow.sdk import AssetAlias, teardown
 from airflow.sdk.bases.decorator import DecoratedOperator
 from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
 from airflow.sdk.definitions.asset import Asset, AssetUniqueKey
@@ -203,6 +203,32 @@ serialized_simple_dag_ground_truth = {
                     "weight_rule": "downstream",
                     "start_trigger_args": None,
                     "start_from_trigger": False,
+                    "inlets": [
+                        {
+                            "__type": "asset",
+                            "__var": {
+                                "extra": {},
+                                "group": "asset",
+                                "name": "asset-1",
+                                "uri": "asset-1",
+                            },
+                        },
+                        {
+                            "__type": "asset_alias",
+                            "__var": {"group": "asset", "name": "alias-name"},
+                        },
+                    ],
+                    "outlets": [
+                        {
+                            "__type": "asset",
+                            "__var": {
+                                "extra": {},
+                                "group": "asset",
+                                "name": "asset-2",
+                                "uri": "asset-2",
+                            },
+                        },
+                    ],
                 },
             },
             {
@@ -252,7 +278,15 @@ serialized_simple_dag_ground_truth = {
             },
         },
         "edge_info": {},
-        "dag_dependencies": [],
+        "dag_dependencies": [
+            {
+                "dependency_id": '{"name": "asset-2", "uri": "asset-2"}',
+                "dependency_type": "asset",
+                "label": "asset-2",
+                "source": "simple_dag",
+                "target": "asset",
+            },
+        ],
         "params": [],
         "tags": [],
     },
@@ -301,6 +335,8 @@ def make_simple_dag():
             owner="airflow",
             executor_config={"pod_override": executor_config_pod},
             doc_md="### Task Tutorial Documentation",
+            inlets=[Asset("asset-1"), AssetAlias(name="alias-name")],
+            outlets=Asset("asset-2"),
         )
     return dag
 
@@ -3137,6 +3173,28 @@ def test_handle_v1_serdag():
                         "weight_rule": "downstream",
                         "start_trigger_args": None,
                         "start_from_trigger": False,
+                        "inlets": [
+                            {
+                                "__type": "dataset",
+                                "__var": {
+                                    "extra": {},
+                                    "uri": "asset-1",
+                                },
+                            },
+                            {
+                                "__type": "dataset_alias",
+                                "__var": {"name": "alias-name"},
+                            },
+                        ],
+                        "outlets": [
+                            {
+                                "__type": "dataset",
+                                "__var": {
+                                    "extra": {},
+                                    "uri": "asset-2",
+                                },
+                            },
+                        ],
                     },
                 },
                 {
@@ -3187,7 +3245,15 @@ def test_handle_v1_serdag():
                 },
             },
             "edge_info": {},
-            "dag_dependencies": [],
+            "dag_dependencies": [
+                {
+                    "dependency_id": '{"name": "asset-2", "uri": "asset-2"}',
+                    "dependency_type": "asset",
+                    "label": "asset-2",
+                    "source": "simple_dag",
+                    "target": "asset",
+                },
+            ],
             "params": [],
         },
     }

Reply via email to