This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 59676b0acc5ac4db916f9b843bfb980ff4e3ca2d Author: Kaxil Naik <[email protected]> AuthorDate: Thu Sep 18 16:28:48 2025 +0100 Bump DAG Serialization version to 3 (#55836) Increments `SERIALIZER_VERSION` from 2 to 3 to reflect significant changes to parameter serialization/deserialization logic as part of SDK decoupling work. While changes maintain backward compatibility, Ash & I discussed it would be wise to change the format since it is a significant change and would help in debugging as well. All existing v1 and v2 serialized DAGs remain compatible. (cherry picked from commit a0fb6fee0244463e79726578b54398d6179c6f5f) --- .../dag-serialization.rst | 2 +- .../airflow/serialization/serialized_objects.py | 12 +- .../unit/serialization/test_dag_serialization.py | 217 ++++++++++++++++++++- 3 files changed, 221 insertions(+), 10 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/dag-serialization.rst b/airflow-core/docs/administration-and-deployment/dag-serialization.rst index 89384f3436b..986821d7e2b 100644 --- a/airflow-core/docs/administration-and-deployment/dag-serialization.rst +++ b/airflow-core/docs/administration-and-deployment/dag-serialization.rst @@ -153,7 +153,7 @@ Serialized Dags now include a ``client_defaults`` section that contains common d .. code-block:: json { - "__version": 2, + "__version": 3, "client_defaults": { "tasks": { "retry_delay": 300.0, diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index caa40ce93dd..4801eda3311 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -616,7 +616,7 @@ class BaseSerialization: _CONSTRUCTOR_PARAMS: dict[str, Parameter] = {} - SERIALIZER_VERSION = 2 + SERIALIZER_VERSION = 3 @classmethod def to_json(cls, var: Any) -> str: @@ -2776,14 +2776,22 @@ class SerializedDAG(BaseSerialization): # Set on the root TG dag_dict["task_group"]["group_display_name"] = "" + @staticmethod + def conversion_v2_to_v3(ser_obj: dict): + # V2 to V3 changes are minimal - mainly client_defaults optimization and + # field presence differences. Only version bump needed. + ser_obj["__version"] = 3 + @classmethod def from_dict(cls, serialized_obj: dict) -> SerializedDAG: """Deserializes a python dict in to the DAG and operators it contains.""" ver = serialized_obj.get("__version", "<not present>") - if ver not in (1, 2): + if ver not in (1, 2, 3): raise ValueError(f"Unsure how to deserialize version {ver!r}") if ver == 1: cls.conversion_v1_to_v2(serialized_obj) + if ver == 2: + cls.conversion_v2_to_v3(serialized_obj) # Extract client_defaults for hierarchical defaults resolution client_defaults = serialized_obj.get("client_defaults", {}) diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 5ff0a42df49..4ed84c32fbe 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -116,7 +116,7 @@ executor_config_pod = k8s.V1Pod( TYPE = Encoding.TYPE VAR = Encoding.VAR serialized_simple_dag_ground_truth = { - "__version": 2, + "__version": 3, "client_defaults": {"tasks": {"retry_delay": 300.0}}, "dag": { "default_args": { @@ -957,7 +957,7 @@ class TestStringifiedDAGs: expected_timetable, ): serialized = { - "__version": 2, + "__version": 3, "dag": { "default_args": {"__type": "dict", "__var": {}}, "dag_id": "simple_dag", @@ -973,7 +973,7 @@ class TestStringifiedDAGs: def test_deserialization_timetable_unregistered(self): serialized = { - "__version": 2, + "__version": 3, "dag": { "default_args": {"__type": "dict", "__var": {}}, "dag_id": "simple_dag", @@ -2242,7 +2242,7 @@ class TestStringifiedDAGs: def test_params_upgrade(self): """When pre-2.2.0 param (i.e. primitive) is deserialized we convert to Param""" serialized = { - "__version": 2, + "__version": 3, "dag": { "dag_id": "simple_dag", "fileloc": "/path/to/file.py", @@ -2263,7 +2263,7 @@ class TestStringifiedDAGs: This test asserts that the params are still deserialized properly. """ serialized = { - "__version": 2, + "__version": 3, "dag": { "dag_id": "simple_dag", "fileloc": "/path/to/file.py", @@ -2290,7 +2290,7 @@ class TestStringifiedDAGs: test only to ensure that params stored in 2.2.0 can still be parsed correctly. """ serialized = { - "__version": 2, + "__version": 3, "dag": { "dag_id": "simple_dag", "fileloc": "/path/to/file.py", @@ -2307,7 +2307,7 @@ class TestStringifiedDAGs: def test_params_serialize_default(self): serialized = { - "__version": 2, + "__version": 3, "dag": { "dag_id": "simple_dag", "fileloc": "/path/to/file.py", @@ -3297,6 +3297,7 @@ def test_handle_v1_serdag(): ] SerializedDAG.conversion_v1_to_v2(v1) + SerializedDAG.conversion_v2_to_v3(v1) # Update a few subtle differences v1["dag"]["tags"] = [] @@ -3311,6 +3312,208 @@ def test_handle_v1_serdag(): assert v1 == expected +def test_handle_v2_serdag(): + """Test that v2 serialized DAGs can be deserialized properly.""" + v2 = { + "__version": 2, + "dag": { + "default_args": { + "__type": "dict", + "__var": { + "depends_on_past": False, + "retries": 1, + "retry_delay": {"__type": "timedelta", "__var": 300.0}, + "max_retry_delay": {"__type": "timedelta", "__var": 600.0}, + }, + }, + "start_date": 1564617600.0, + "timetable": { + "__type": "airflow.timetables.interval.DeltaDataIntervalTimetable", + "__var": { + "delta": 86400.0, + }, + }, + "task_group": { + "_group_id": None, + "group_display_name": "", + "prefix_group_id": True, + "children": { + "bash_task": ("operator", "bash_task"), + "custom_task": ("operator", "custom_task"), + }, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "upstream_group_ids": [], + "downstream_group_ids": [], + "upstream_task_ids": [], + "downstream_task_ids": [], + }, + "is_paused_upon_creation": False, + "dag_id": "simple_dag", + "catchup": False, + "disable_bundle_versioning": False, + "doc_md": "### DAG Tutorial Documentation", + "fileloc": None, + "_processor_dags_folder": ( + AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "tests" / "unit" / "dags" + ).as_posix(), + "tasks": [ + { + "__type": "operator", + "__var": { + "task_id": "bash_task", + "retries": 1, + "retry_delay": 300.0, + "max_retry_delay": 600.0, + "downstream_task_ids": [], + "ui_color": "#f0ede4", + "ui_fgcolor": "#000", + "template_ext": [".sh", ".bash"], + "template_fields": ["bash_command", "env", "cwd"], + "template_fields_renderers": { + "bash_command": "bash", + "env": "json", + }, + "bash_command": "echo {{ task.task_id }}", + "task_type": "BashOperator", + "_task_module": "airflow.providers.standard.operators.bash", + "owner": "airflow1", + "pool": "pool1", + "is_setup": False, + "is_teardown": False, + "on_failure_fail_dagrun": False, + "executor_config": { + "__type": "dict", + "__var": { + "pod_override": { + "__type": "k8s.V1Pod", + "__var": PodGenerator.serialize_pod(executor_config_pod), + } + }, + }, + "doc_md": "### Task Tutorial Documentation", + "_needs_expansion": False, + "weight_rule": "downstream", + "start_trigger_args": None, + "start_from_trigger": False, + "inlets": [ + { + "__type": "asset", + "__var": { + "extra": {}, + "group": "asset", + "name": "asset-1", + "uri": "asset-1", + }, + }, + { + "__type": "asset_alias", + "__var": {"group": "asset", "name": "alias-name"}, + }, + ], + "outlets": [ + { + "__type": "asset", + "__var": { + "extra": {}, + "group": "asset", + "name": "asset-2", + "uri": "asset-2", + }, + }, + ], + }, + }, + { + "__type": "operator", + "__var": { + "task_id": "custom_task", + "retries": 1, + "retry_delay": 300.0, + "max_retry_delay": 600.0, + "downstream_task_ids": [], + "_operator_extra_links": {"Google Custom": "_link_CustomOpLink"}, + "ui_color": "#fff", + "ui_fgcolor": "#000", + "template_ext": [], + "template_fields": ["bash_command"], + "template_fields_renderers": {}, + "task_type": "CustomOperator", + "_operator_name": "@custom", + "_task_module": "tests_common.test_utils.mock_operators", + "pool": "default_pool", + "is_setup": False, + "is_teardown": False, + "on_failure_fail_dagrun": False, + "_needs_expansion": False, + "weight_rule": "downstream", + "start_trigger_args": None, + "start_from_trigger": False, + }, + }, + ], + "timezone": "UTC", + "access_control": { + "__type": "dict", + "__var": { + "test_role": { + "__type": "dict", + "__var": { + "DAGs": { + "__type": "set", + "__var": [ + permissions.ACTION_CAN_READ, + permissions.ACTION_CAN_EDIT, + ], + } + }, + } + }, + }, + "edge_info": {}, + "dag_dependencies": [ + { + "dependency_id": '{"name": "asset-2", "uri": "asset-2"}', + "dependency_type": "asset", + "label": "asset-2", + "source": "simple_dag", + "target": "asset", + }, + ], + "params": [], + "tags": [], + }, + } + + # Test that v2 DAGs can be deserialized without conversion + dag = SerializedDAG.from_dict(v2) + + expected_sdag = copy.deepcopy(serialized_simple_dag_ground_truth) + expected = SerializedDAG.from_dict(expected_sdag) + + fields_to_verify = set(vars(expected).keys()) - { + "task_group", # Tested separately + "last_loaded", # Dynamically set to utcnow + } + + for f in fields_to_verify: + dag_value = getattr(dag, f) + expected_value = getattr(expected, f) + + assert dag_value == expected_value, ( + f"V2 DAG field '{f}' differs from V3: V2={dag_value!r} != V3={expected_value!r}" + ) + + for f in set(vars(expected.task_group).keys()) - {"dag"}: + dag_tg_value = getattr(dag.task_group, f) + expected_tg_value = getattr(expected.task_group, f) + + assert dag_tg_value == expected_tg_value, ( + f"V2 task_group field '{f}' differs: V2={dag_tg_value!r} != V3={expected_tg_value!r}" + ) + + def test_email_optimization_removes_email_attrs_when_email_empty(): """Test that email_on_failure and email_on_retry are removed when email is empty.""" with DAG(dag_id="test_email_optimization") as dag:
