This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 65c4900bd7b fix(serialized_objects): fix how dataset/asset 
dag_dependency is converted from v1 to v2 (#49281)
65c4900bd7b is described below

commit 65c4900bd7bc4cc1d5227f68dbbc2e5a33f4eaf4
Author: Wei Lee <[email protected]>
AuthorDate: Tue Apr 15 22:28:52 2025 +0800

    fix(serialized_objects): fix how dataset/asset dag_dependency is converted 
from v1 to v2 (#49281)
    
    ## Why
    In the original conversion, it replaces every "dataset" to "asset". 
However, `Datset("dataset-uri")` will be interpreted as `Asset("asset-uri")` 
which should be `Asset("dataset-uri")` instead.
    
    ## What
    * If the dag dep doesn't have a "label" column, fill it with "dependency_id"
    * If the "dependency_type" is "dataset" or "dataset-alias", update it to 
"asset" or "asset-alias".
        * If the source/target value is "dataset" or "dataset-alias" and it 
equals to its "dependency_type" (which means this is a root node or an end 
node), update it to "asset" or "asset-alias".
        * If the source/target value starts with "dataset:" or "dataset-alias:" 
(which means its and intermediate node), update it to "asset:" or 
"asset-alias:".
    
    
    ---------
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 .../airflow/serialization/serialized_objects.py    |  18 ++-
 .../unit/serialization/test_dag_serialization.py   | 122 ++++++++++++++++++++-
 2 files changed, 132 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 8dd2b9294d2..1f425c4facb 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1892,9 +1892,21 @@ class SerializedDAG(DAG, BaseSerialization):
 
         if "dag_dependencies" in dag_dict:
             for dep in dag_dict["dag_dependencies"]:
-                for fld in ("dependency_type", "target", "source"):
-                    if dep.get(fld) == "dataset":
-                        dep[fld] = "asset"
+                dep_type = dep.get("dependency_type")
+                if dep_type in ("dataset", "dataset-alias"):
+                    dep["dependency_type"] = dep_type.replace("dataset", 
"asset")
+
+                if not dep.get("label"):
+                    dep["label"] = dep["dependency_id"]
+
+                for fld in ("target", "source"):
+                    val = dep.get(fld)
+                    if val == dep_type and val in ("dataset", "dataset-alias"):
+                        dep[fld] = dep[fld].replace("dataset", "asset")
+                    elif val.startswith("dataset:"):
+                        dep[fld] = dep[fld].replace("dataset:", "asset:")
+                    elif val.startswith("dataset-alias:"):
+                        dep[fld] = dep[fld].replace("dataset-alias:", 
"asset-alias:")
 
         for task in dag_dict["tasks"]:
             task_var: dict = task["__var"]
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index d3e605063d8..bff4cbfd5d4 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -3246,17 +3246,128 @@ def test_handle_v1_serdag():
             },
             "edge_info": {},
             "dag_dependencies": [
+                # dataset as schedule (source)
                 {
-                    "dependency_id": '{"name": "asset-2", "uri": "asset-2"}',
-                    "dependency_type": "asset",
-                    "label": "asset-2",
-                    "source": "simple_dag",
-                    "target": "asset",
+                    "source": "dataset",
+                    "target": "dag1",
+                    "dependency_type": "dataset",
+                    "dependency_id": "dataset_uri_1",
+                },
+                # dataset alias (resolved) as schedule (source)
+                {
+                    "source": "dataset",
+                    "target": "dataset-alias:alias_name_1",
+                    "dependency_type": "dataset",
+                    "dependency_id": "dataset_uri_2",
+                },
+                {
+                    "source": "dataset:alias_name_1",
+                    "target": "dag2",
+                    "dependency_type": "dataset-alias",
+                    "dependency_id": "alias_name_1",
+                },
+                # dataset alias (not resolved) as schedule (source)
+                {
+                    "source": "dataset-alias",
+                    "target": "dag2",
+                    "dependency_type": "dataset-alias",
+                    "dependency_id": "alias_name_2",
+                },
+                # dataset as outlets (target)
+                {
+                    "source": "dag10",
+                    "target": "dataset",
+                    "dependency_type": "dataset",
+                    "dependency_id": "dataset_uri_10",
+                },
+                # dataset alias (resolved) as outlets (target)
+                {
+                    "source": "dag20",
+                    "target": "dataset-alias:alias_name_10",
+                    "dependency_type": "dataset",
+                    "dependency_id": "dataset_uri_20",
+                },
+                {
+                    "source": "dataset:dataset_uri_20",
+                    "target": "dataset-alias",
+                    "dependency_type": "dataset-alias",
+                    "dependency_id": "alias_name_10",
+                },
+                # dataset alias (not resolved) as outlets (target)
+                {
+                    "source": "dag2",
+                    "target": "dataset-alias",
+                    "dependency_type": "dataset-alias",
+                    "dependency_id": "alias_name_2",
                 },
             ],
             "params": [],
         },
     }
+    expected_dag_dependencies = [
+        # asset as schedule (source)
+        {
+            "dependency_id": "dataset_uri_1",
+            "dependency_type": "asset",
+            "label": "dataset_uri_1",
+            "source": "asset",
+            "target": "dag1",
+        },
+        # asset alias (resolved) as schedule (source)
+        {
+            "dependency_id": "dataset_uri_2",
+            "dependency_type": "asset",
+            "label": "dataset_uri_2",
+            "source": "asset",
+            "target": "asset-alias:alias_name_1",
+        },
+        {
+            "dependency_id": "alias_name_1",
+            "dependency_type": "asset-alias",
+            "label": "alias_name_1",
+            "source": "asset:alias_name_1",
+            "target": "dag2",
+        },
+        # asset alias (not resolved) as schedule (source)
+        {
+            "dependency_id": "alias_name_2",
+            "dependency_type": "asset-alias",
+            "label": "alias_name_2",
+            "source": "asset-alias",
+            "target": "dag2",
+        },
+        # asset as outlets (target)
+        {
+            "dependency_id": "dataset_uri_10",
+            "dependency_type": "asset",
+            "label": "dataset_uri_10",
+            "source": "dag10",
+            "target": "asset",
+        },
+        # asset alias (resolved) as outlets (target)
+        {
+            "dependency_id": "dataset_uri_20",
+            "dependency_type": "asset",
+            "label": "dataset_uri_20",
+            "source": "dag20",
+            "target": "asset-alias:alias_name_10",
+        },
+        {
+            "dependency_id": "alias_name_10",
+            "dependency_type": "asset-alias",
+            "label": "alias_name_10",
+            "source": "asset:dataset_uri_20",
+            "target": "asset-alias",
+        },
+        # asset alias (not resolved) as outlets (target)
+        {
+            "dependency_id": "alias_name_2",
+            "dependency_type": "asset-alias",
+            "label": "alias_name_2",
+            "source": "dag2",
+            "target": "asset-alias",
+        },
+    ]
 
     SerializedDAG.conversion_v1_to_v2(v1)
 
@@ -3266,6 +3377,7 @@ def test_handle_v1_serdag():
     v1["dag"]["disable_bundle_versioning"] = False
 
     expected = copy.deepcopy(serialized_simple_dag_ground_truth)
+    expected["dag"]["dag_dependencies"] = expected_dag_dependencies
     del expected["dag"]["tasks"][1]["__var"]["_operator_extra_links"]
 
     assert v1 == expected

Reply via email to