This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 59676b0acc5ac4db916f9b843bfb980ff4e3ca2d
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Sep 18 16:28:48 2025 +0100

    Bump DAG Serialization version to 3 (#55836)
    
    Increments `SERIALIZER_VERSION` from 2 to 3 to reflect significant changes
    to parameter serialization/deserialization logic as part of SDK decoupling
    work. While changes maintain backward compatibility, Ash & I discussed it
    would be wise to change the format since it is a significant change and 
would
    help in debugging as well.
    
    All existing v1 and v2 serialized DAGs remain compatible.
    
    (cherry picked from commit a0fb6fee0244463e79726578b54398d6179c6f5f)
---
 .../dag-serialization.rst                          |   2 +-
 .../airflow/serialization/serialized_objects.py    |  12 +-
 .../unit/serialization/test_dag_serialization.py   | 217 ++++++++++++++++++++-
 3 files changed, 221 insertions(+), 10 deletions(-)

diff --git 
a/airflow-core/docs/administration-and-deployment/dag-serialization.rst 
b/airflow-core/docs/administration-and-deployment/dag-serialization.rst
index 89384f3436b..986821d7e2b 100644
--- a/airflow-core/docs/administration-and-deployment/dag-serialization.rst
+++ b/airflow-core/docs/administration-and-deployment/dag-serialization.rst
@@ -153,7 +153,7 @@ Serialized Dags now include a ``client_defaults`` section 
that contains common d
 .. code-block:: json
 
     {
-      "__version": 2,
+      "__version": 3,
       "client_defaults": {
         "tasks": {
           "retry_delay": 300.0,
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index caa40ce93dd..4801eda3311 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -616,7 +616,7 @@ class BaseSerialization:
 
     _CONSTRUCTOR_PARAMS: dict[str, Parameter] = {}
 
-    SERIALIZER_VERSION = 2
+    SERIALIZER_VERSION = 3
 
     @classmethod
     def to_json(cls, var: Any) -> str:
@@ -2776,14 +2776,22 @@ class SerializedDAG(BaseSerialization):
         # Set on the root TG
         dag_dict["task_group"]["group_display_name"] = ""
 
+    @staticmethod
+    def conversion_v2_to_v3(ser_obj: dict):
+        # V2 to V3 changes are minimal - mainly client_defaults optimization 
and
+        # field presence differences. Only version bump needed.
+        ser_obj["__version"] = 3
+
     @classmethod
     def from_dict(cls, serialized_obj: dict) -> SerializedDAG:
         """Deserializes a python dict in to the DAG and operators it 
contains."""
         ver = serialized_obj.get("__version", "<not present>")
-        if ver not in (1, 2):
+        if ver not in (1, 2, 3):
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         if ver == 1:
             cls.conversion_v1_to_v2(serialized_obj)
+        if ver == 2:
+            cls.conversion_v2_to_v3(serialized_obj)
 
         # Extract client_defaults for hierarchical defaults resolution
         client_defaults = serialized_obj.get("client_defaults", {})
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 5ff0a42df49..4ed84c32fbe 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -116,7 +116,7 @@ executor_config_pod = k8s.V1Pod(
 TYPE = Encoding.TYPE
 VAR = Encoding.VAR
 serialized_simple_dag_ground_truth = {
-    "__version": 2,
+    "__version": 3,
     "client_defaults": {"tasks": {"retry_delay": 300.0}},
     "dag": {
         "default_args": {
@@ -957,7 +957,7 @@ class TestStringifiedDAGs:
         expected_timetable,
     ):
         serialized = {
-            "__version": 2,
+            "__version": 3,
             "dag": {
                 "default_args": {"__type": "dict", "__var": {}},
                 "dag_id": "simple_dag",
@@ -973,7 +973,7 @@ class TestStringifiedDAGs:
 
     def test_deserialization_timetable_unregistered(self):
         serialized = {
-            "__version": 2,
+            "__version": 3,
             "dag": {
                 "default_args": {"__type": "dict", "__var": {}},
                 "dag_id": "simple_dag",
@@ -2242,7 +2242,7 @@ class TestStringifiedDAGs:
     def test_params_upgrade(self):
         """When pre-2.2.0 param (i.e. primitive) is deserialized we convert to 
Param"""
         serialized = {
-            "__version": 2,
+            "__version": 3,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -2263,7 +2263,7 @@ class TestStringifiedDAGs:
         This test asserts that the params are still deserialized properly.
         """
         serialized = {
-            "__version": 2,
+            "__version": 3,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -2290,7 +2290,7 @@ class TestStringifiedDAGs:
         test only to ensure that params stored in 2.2.0 can still be parsed 
correctly.
         """
         serialized = {
-            "__version": 2,
+            "__version": 3,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -2307,7 +2307,7 @@ class TestStringifiedDAGs:
 
     def test_params_serialize_default(self):
         serialized = {
-            "__version": 2,
+            "__version": 3,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -3297,6 +3297,7 @@ def test_handle_v1_serdag():
     ]
 
     SerializedDAG.conversion_v1_to_v2(v1)
+    SerializedDAG.conversion_v2_to_v3(v1)
 
     # Update a few subtle differences
     v1["dag"]["tags"] = []
@@ -3311,6 +3312,208 @@ def test_handle_v1_serdag():
     assert v1 == expected
 
 
+def test_handle_v2_serdag():
+    """Test that v2 serialized DAGs can be deserialized properly."""
+    v2 = {
+        "__version": 2,
+        "dag": {
+            "default_args": {
+                "__type": "dict",
+                "__var": {
+                    "depends_on_past": False,
+                    "retries": 1,
+                    "retry_delay": {"__type": "timedelta", "__var": 300.0},
+                    "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
+                },
+            },
+            "start_date": 1564617600.0,
+            "timetable": {
+                "__type": 
"airflow.timetables.interval.DeltaDataIntervalTimetable",
+                "__var": {
+                    "delta": 86400.0,
+                },
+            },
+            "task_group": {
+                "_group_id": None,
+                "group_display_name": "",
+                "prefix_group_id": True,
+                "children": {
+                    "bash_task": ("operator", "bash_task"),
+                    "custom_task": ("operator", "custom_task"),
+                },
+                "tooltip": "",
+                "ui_color": "CornflowerBlue",
+                "ui_fgcolor": "#000",
+                "upstream_group_ids": [],
+                "downstream_group_ids": [],
+                "upstream_task_ids": [],
+                "downstream_task_ids": [],
+            },
+            "is_paused_upon_creation": False,
+            "dag_id": "simple_dag",
+            "catchup": False,
+            "disable_bundle_versioning": False,
+            "doc_md": "### DAG Tutorial Documentation",
+            "fileloc": None,
+            "_processor_dags_folder": (
+                AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "tests" / "unit" / 
"dags"
+            ).as_posix(),
+            "tasks": [
+                {
+                    "__type": "operator",
+                    "__var": {
+                        "task_id": "bash_task",
+                        "retries": 1,
+                        "retry_delay": 300.0,
+                        "max_retry_delay": 600.0,
+                        "downstream_task_ids": [],
+                        "ui_color": "#f0ede4",
+                        "ui_fgcolor": "#000",
+                        "template_ext": [".sh", ".bash"],
+                        "template_fields": ["bash_command", "env", "cwd"],
+                        "template_fields_renderers": {
+                            "bash_command": "bash",
+                            "env": "json",
+                        },
+                        "bash_command": "echo {{ task.task_id }}",
+                        "task_type": "BashOperator",
+                        "_task_module": 
"airflow.providers.standard.operators.bash",
+                        "owner": "airflow1",
+                        "pool": "pool1",
+                        "is_setup": False,
+                        "is_teardown": False,
+                        "on_failure_fail_dagrun": False,
+                        "executor_config": {
+                            "__type": "dict",
+                            "__var": {
+                                "pod_override": {
+                                    "__type": "k8s.V1Pod",
+                                    "__var": 
PodGenerator.serialize_pod(executor_config_pod),
+                                }
+                            },
+                        },
+                        "doc_md": "### Task Tutorial Documentation",
+                        "_needs_expansion": False,
+                        "weight_rule": "downstream",
+                        "start_trigger_args": None,
+                        "start_from_trigger": False,
+                        "inlets": [
+                            {
+                                "__type": "asset",
+                                "__var": {
+                                    "extra": {},
+                                    "group": "asset",
+                                    "name": "asset-1",
+                                    "uri": "asset-1",
+                                },
+                            },
+                            {
+                                "__type": "asset_alias",
+                                "__var": {"group": "asset", "name": 
"alias-name"},
+                            },
+                        ],
+                        "outlets": [
+                            {
+                                "__type": "asset",
+                                "__var": {
+                                    "extra": {},
+                                    "group": "asset",
+                                    "name": "asset-2",
+                                    "uri": "asset-2",
+                                },
+                            },
+                        ],
+                    },
+                },
+                {
+                    "__type": "operator",
+                    "__var": {
+                        "task_id": "custom_task",
+                        "retries": 1,
+                        "retry_delay": 300.0,
+                        "max_retry_delay": 600.0,
+                        "downstream_task_ids": [],
+                        "_operator_extra_links": {"Google Custom": 
"_link_CustomOpLink"},
+                        "ui_color": "#fff",
+                        "ui_fgcolor": "#000",
+                        "template_ext": [],
+                        "template_fields": ["bash_command"],
+                        "template_fields_renderers": {},
+                        "task_type": "CustomOperator",
+                        "_operator_name": "@custom",
+                        "_task_module": 
"tests_common.test_utils.mock_operators",
+                        "pool": "default_pool",
+                        "is_setup": False,
+                        "is_teardown": False,
+                        "on_failure_fail_dagrun": False,
+                        "_needs_expansion": False,
+                        "weight_rule": "downstream",
+                        "start_trigger_args": None,
+                        "start_from_trigger": False,
+                    },
+                },
+            ],
+            "timezone": "UTC",
+            "access_control": {
+                "__type": "dict",
+                "__var": {
+                    "test_role": {
+                        "__type": "dict",
+                        "__var": {
+                            "DAGs": {
+                                "__type": "set",
+                                "__var": [
+                                    permissions.ACTION_CAN_READ,
+                                    permissions.ACTION_CAN_EDIT,
+                                ],
+                            }
+                        },
+                    }
+                },
+            },
+            "edge_info": {},
+            "dag_dependencies": [
+                {
+                    "dependency_id": '{"name": "asset-2", "uri": "asset-2"}',
+                    "dependency_type": "asset",
+                    "label": "asset-2",
+                    "source": "simple_dag",
+                    "target": "asset",
+                },
+            ],
+            "params": [],
+            "tags": [],
+        },
+    }
+
+    # Test that v2 DAGs can be deserialized without conversion
+    dag = SerializedDAG.from_dict(v2)
+
+    expected_sdag = copy.deepcopy(serialized_simple_dag_ground_truth)
+    expected = SerializedDAG.from_dict(expected_sdag)
+
+    fields_to_verify = set(vars(expected).keys()) - {
+        "task_group",  # Tested separately
+        "last_loaded",  # Dynamically set to utcnow
+    }
+
+    for f in fields_to_verify:
+        dag_value = getattr(dag, f)
+        expected_value = getattr(expected, f)
+
+        assert dag_value == expected_value, (
+            f"V2 DAG field '{f}' differs from V3: V2={dag_value!r} != 
V3={expected_value!r}"
+        )
+
+    for f in set(vars(expected.task_group).keys()) - {"dag"}:
+        dag_tg_value = getattr(dag.task_group, f)
+        expected_tg_value = getattr(expected.task_group, f)
+
+        assert dag_tg_value == expected_tg_value, (
+            f"V2 task_group field '{f}' differs: V2={dag_tg_value!r} != 
V3={expected_tg_value!r}"
+        )
+
+
 def test_email_optimization_removes_email_attrs_when_email_empty():
     """Test that email_on_failure and email_on_retry are removed when email is 
empty."""
     with DAG(dag_id="test_email_optimization") as dag:

Reply via email to