This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bd8941dee9d Bump Serialized DAG to v2 and handle conversion from v1 
(#49020)
bd8941dee9d is described below

commit bd8941dee9d7efec433c3e4215124e9e03d1e610
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Apr 14 13:12:41 2025 -0700

    Bump Serialized DAG to v2 and handle conversion from v1 (#49020)
    
    A number of changes have been made to the serialized DAG structure between 
2.x
    and 3.0, and with the addition of DAG versioning in 3.0 we can't use our old
    approach of simply force-reserializing all DAGs anymore.
    
    So instead we bump the version record (that has been 1 since it was first
    introduced in 1.10.7, even though there were changes) and upgrade from the
    latest format in Airflow 2.x such that the Webserver will be able to display
    things for it.
    
    The scheduler is not so easy to fix, so we use a proxy of "skip things that
    haven't been re-processed" yet by making the scheduler ignore DAGs that 
don't
    a bundle name set (as all dags processed in 3.0 have this set, even if it is
    for a local file bundle)
    
    ---------
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 .../core_api/datamodels/dag_versions.py            |   6 +-
 .../api_fastapi/core_api/openapi/v1-generated.yaml |   4 +-
 .../airflow/api_fastapi/core_api/routes/ui/grid.py |  10 ++
 .../api_fastapi/core_api/services/ui/grid.py       |  15 +-
 .../src/airflow/jobs/scheduler_job_runner.py       |   1 +
 .../airflow/serialization/serialized_objects.py    | 142 ++++++++++++++++-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |   9 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |   2 +-
 .../unit/serialization/test_dag_serialization.py   | 177 ++++++++++++++++++---
 .../src/airflow/sdk/definitions/mappedoperator.py  |   2 +
 10 files changed, 331 insertions(+), 37 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
index fac36b885f3..47a28b75284 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
@@ -31,7 +31,7 @@ class DagVersionResponse(BaseModel):
     id: UUID
     version_number: int
     dag_id: str
-    bundle_name: str
+    bundle_name: str | None
     bundle_version: str | None
     created_at: datetime
 
@@ -39,7 +39,9 @@ class DagVersionResponse(BaseModel):
     @computed_field  # type: ignore[misc]
     @property
     def bundle_url(self) -> str | None:
-        return DagBundlesManager().view_url(self.bundle_name, 
self.bundle_version)
+        if self.bundle_name:
+            return DagBundlesManager().view_url(self.bundle_name, 
self.bundle_version)
+        return None
 
 
 class DAGVersionCollectionResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index ca8ba635582..2e898e370ac 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -9624,7 +9624,9 @@ components:
           type: string
           title: Dag Id
         bundle_name:
-          type: string
+          anyOf:
+          - type: string
+          - type: 'null'
           title: Bundle Name
         bundle_version:
           anyOf:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index c7c34bac7d1..2b64551f982 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -21,6 +21,7 @@ import collections
 import itertools
 from typing import Annotated
 
+import structlog
 from fastapi import Depends, HTTPException, Request, status
 from sqlalchemy import select
 from sqlalchemy.orm import joinedload
@@ -58,6 +59,7 @@ from airflow.models.dag_version import DagVersion
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.utils.state import TaskInstanceState
 
+log = structlog.get_logger(logger_name=__name__)
 grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
 
 
@@ -171,6 +173,14 @@ def grid_data(
                 .order_by(DagVersion.id)  # ascending cus this is mostly for 
pre-3.0 upgrade
                 .limit(1)
             )
+        if not version.serialized_dag:
+            log.error(
+                "No serialized dag found",
+                dag_id=tis[0].dag_id,
+                version_id=version.id,
+                version_number=version.version_number,
+            )
+            continue
         run_dag = version.serialized_dag.dag
         task_node_map = get_task_group_map(dag=run_dag)
         for ti in tis:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index 8ef2b25012c..f35c4172e88 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -22,6 +22,7 @@ from operator import methodcaller
 from typing import Callable
 from uuid import UUID
 
+import structlog
 from sqlalchemy import select
 from typing_extensions import Any
 
@@ -47,6 +48,8 @@ from airflow.serialization.serialized_objects import 
SerializedDAG
 from airflow.utils.state import TaskInstanceState
 from airflow.utils.task_group import task_group_to_dict
 
+log = structlog.get_logger(logger_name=__name__)
+
 
 @cache
 def get_task_group_children_getter() -> Callable:
@@ -275,6 +278,13 @@ def _get_serdag(ti, session):
         )
     if not dag_version:
         raise RuntimeError("No dag_version object could be found.")
+    if not dag_version.serialized_dag:
+        log.error(
+            "No serialized dag found",
+            dag_id=dag_version.dag_id,
+            version_id=dag_version.id,
+            version_number=dag_version.version_number,
+        )
     return dag_version.serialized_dag
 
 
@@ -283,7 +293,10 @@ def get_combined_structure(task_instances, session):
     merged_nodes = []
     # we dedup with serdag, as serdag.dag varies somehow?
     serdags = {_get_serdag(ti, session) for ti in task_instances}
-    dags = [serdag.dag for serdag in serdags]
+    dags = []
+    for serdag in serdags:
+        if serdag:
+            dags.append(serdag.dag)
     for dag in dags:
         nodes = [task_group_to_dict(child) for child in 
dag.task_group.topological_sort()]
         _merge_node_dicts(merged_nodes, nodes)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index c95303a480c..b5535ea44a6 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -356,6 +356,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 .join(TI.dag_model)
                 .where(~DM.is_paused)
                 .where(TI.state == TaskInstanceState.SCHEDULED)
+                .where(DM.bundle_name.is_not(None))
                 .options(selectinload(TI.dag_model))
                 .order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
             )
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 6bda483373b..2340c171970 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -582,7 +582,7 @@ class BaseSerialization:
 
     _CONSTRUCTOR_PARAMS: dict[str, Parameter] = {}
 
-    SERIALIZER_VERSION = 1
+    SERIALIZER_VERSION = 2
 
     @classmethod
     def to_json(cls, var: DAG | BaseOperator | dict | list | set | tuple) -> 
str:
@@ -1137,7 +1137,6 @@ class DependencyDetector:
         """Detect dependencies set directly on the DAG object."""
         if not dag:
             return
-
         yield from 
dag.timetable.asset_condition.iter_dag_dependencies(source="", 
target=dag.dag_id)
 
 
@@ -1298,10 +1297,12 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
             serialize_op["_operator_name"] = op.operator_name
 
         # Used to determine if an Operator is inherited from EmptyOperator
-        serialize_op["_is_empty"] = op.inherits_from_empty_operator
+        if op.inherits_from_empty_operator:
+            serialize_op["_is_empty"] = True
 
         # Used to determine if an Operator is inherited from SkipMixin or 
BranchMixin
-        serialize_op["_can_skip_downstream"] = op.inherits_from_skipmixin
+        if op.inherits_from_skipmixin:
+            serialize_op["_can_skip_downstream"] = True
 
         serialize_op["start_trigger_args"] = (
             encode_start_trigger_args(op.start_trigger_args) if 
op.start_trigger_args else None
@@ -1789,12 +1790,140 @@ class SerializedDAG(DAG, BaseSerialization):
         cls.validate_schema(json_dict)
         return json_dict
 
+    @classmethod
+    def conversion_v1_to_v2(cls, ser_obj: dict):
+        dag_dict = ser_obj["dag"]
+        dag_renames = [
+            ("_dag_id", "dag_id"),
+            ("_task_group", "task_group"),
+            ("_access_control", "access_control"),
+        ]
+        task_renames = [("_task_type", "task_type")]
+        #
+        tasks_remove = [
+            "_log_config_logger_name",
+            "deps",
+            "sla",
+            # Operator extra links from Airflow 2 won't work anymore, only new 
ones, so remove these
+            "_operator_extra_links",
+        ]
+
+        ser_obj["__version"] = 2
+
+        def replace_dataset_in_str(s):
+            return s.replace("Dataset", "Asset").replace("dataset", "asset")
+
+        def _replace_dataset_with_asset_in_timetables(obj, parent_key=None):
+            if isinstance(obj, dict):
+                new_obj = {}
+                for k, v in obj.items():
+                    new_key = replace_dataset_in_str(k) if isinstance(k, str) 
else k
+                    # Don't replace uri values
+                    if new_key == "uri":
+                        new_obj[new_key] = v
+                    else:
+                        new_value = (
+                            replace_dataset_in_str(v)
+                            if isinstance(v, str)
+                            else _replace_dataset_with_asset_in_timetables(v, 
parent_key=new_key)
+                        )
+                        new_obj[new_key] = new_value
+                # Insert "name" and "group" if this is inside the 'objects' 
list
+                if parent_key == "objects":
+                    new_obj["name"] = None
+                    new_obj["group"] = None
+                return new_obj
+
+            elif isinstance(obj, list):
+                return [_replace_dataset_with_asset_in_timetables(i, 
parent_key=parent_key) for i in obj]
+
+            return obj
+
+        for old, new in dag_renames:
+            dag_dict[new] = dag_dict.pop(old)
+
+        if default_args := dag_dict.get("default_args"):
+            for k in tasks_remove:
+                default_args["__var"].pop(k, None)
+
+        if sched := dag_dict.pop("schedule_interval", None):
+            if sched is None:
+                dag_dict["timetable"] = {
+                    "__var": {},
+                    "__type": "airflow.timetables.simple.NullTimetable",
+                }
+            elif isinstance(sched, str):
+                # "@daily" etc
+                if sched == "@once":
+                    dag_dict["timetable"] = {
+                        "__var": {},
+                        "__type": "airflow.timetables.simple.OnceTimetable",
+                    }
+                elif sched == "@continuous":
+                    dag_dict["timetable"] = {
+                        "__var": {},
+                        "__type": 
"airflow.timetables.simple.ContinuousTimetable",
+                    }
+                elif sched == "@daily":
+                    dag_dict["timetable"] = {
+                        "__var": {
+                            "interval": 0.0,
+                            "timezone": "UTC",
+                            "expression": "0 0 * * *",
+                            "run_immediately": False,
+                        },
+                        "__type": 
"airflow.timetables.trigger.CronTriggerTimetable",
+                    }
+                else:
+                    # We should maybe convert this to None and warn instead
+                    raise ValueError(f"Unknown schedule_interval field 
{sched!r}")
+            elif sched.get("__type") == "timedelta":
+                dag_dict["timetable"] = {
+                    "__type": 
"airflow.timetables.interval.DeltaDataIntervalTimetable",
+                    "__var": {"delta": sched["__var"]},
+                }
+        elif timetable := dag_dict.get("timetable"):
+            if timetable["__type"] in {
+                "airflow.timetables.simple.DatasetTriggeredTimetable",
+                "airflow.timetables.datasets.DatasetOrTimeSchedule",
+            }:
+                dag_dict["timetable"] = 
_replace_dataset_with_asset_in_timetables(dag_dict["timetable"])
+
+        if "dag_dependencies" in dag_dict:
+            for dep in dag_dict["dag_dependencies"]:
+                for fld in ("dependency_type", "target", "source"):
+                    if dep.get(fld) == "dataset":
+                        dep[fld] = "asset"
+
+        for task in dag_dict["tasks"]:
+            task_var: dict = task["__var"]
+            if "airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep" 
in task_var.get("deps", []):
+                task_var["_is_sensor"] = True
+            for k in tasks_remove:
+                task_var.pop(k, None)
+            for old, new in task_renames:
+                task_var[new] = task_var.pop(old)
+            for item in task_var.get("outlets", []):
+                if isinstance(item, dict) and "__type" in item:
+                    item["__type"] = replace_dataset_in_str(item["__type"])
+            for item in task_var.get("inlets", []):
+                if isinstance(item, dict) and "__type" in item:
+                    item["__type"] = replace_dataset_in_str(item["__type"])
+                var_ = item["__var"]
+                var_["name"] = None
+                var_["group"] = None
+
+        # Set on the root TG
+        dag_dict["task_group"]["group_display_name"] = ""
+
     @classmethod
     def from_dict(cls, serialized_obj: dict) -> SerializedDAG:
         """Deserializes a python dict in to the DAG and operators it 
contains."""
         ver = serialized_obj.get("__version", "<not present>")
-        if ver != cls.SERIALIZER_VERSION:
+        if ver not in (1, 2):
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
+        if ver == 1:
+            cls.conversion_v1_to_v2(serialized_obj)
         return cls.deserialize_dag(serialized_obj["dag"])
 
 
@@ -1848,8 +1977,9 @@ class TaskGroupSerialization(BaseSerialization):
         group_id = cls.deserialize(encoded_group["_group_id"])
         kwargs = {
             key: cls.deserialize(encoded_group[key])
-            for key in ["prefix_group_id", "tooltip", "ui_color", 
"ui_fgcolor", "group_display_name"]
+            for key in ["prefix_group_id", "tooltip", "ui_color", "ui_fgcolor"]
         }
+        kwargs["group_display_name"] = 
cls.deserialize(encoded_group.get("group_display_name", ""))
 
         if not encoded_group.get("is_mapped"):
             group = TaskGroup(group_id=group_id, parent_group=parent_group, 
dag=dag, **kwargs)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 205e851a9f4..5e1598a153a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3407,7 +3407,14 @@ export const $DagVersionResponse = {
       title: "Dag Id",
     },
     bundle_name: {
-      type: "string",
+      anyOf: [
+        {
+          type: "string",
+        },
+        {
+          type: "null",
+        },
+      ],
       title: "Bundle Name",
     },
     bundle_version: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 72707105751..834816afa63 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -908,7 +908,7 @@ export type DagVersionResponse = {
   id: string;
   version_number: number;
   dag_id: string;
-  bundle_name: string;
+  bundle_name: string | null;
   bundle_version: string | null;
   created_at: string;
   readonly bundle_url: string | null;
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index f70cd7e5338..fb2f3d8f1f7 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -122,7 +122,7 @@ executor_config_pod = k8s.V1Pod(
 TYPE = Encoding.TYPE
 VAR = Encoding.VAR
 serialized_simple_dag_ground_truth = {
-    "__version": 1,
+    "__version": 2,
     "dag": {
         "default_args": {
             "__type": "dict",
@@ -174,8 +174,6 @@ serialized_simple_dag_ground_truth = {
                     "retry_delay": 300.0,
                     "max_retry_delay": 600.0,
                     "downstream_task_ids": [],
-                    "_is_empty": False,
-                    "_can_skip_downstream": False,
                     "ui_color": "#f0ede4",
                     "ui_fgcolor": "#000",
                     "template_ext": [".sh", ".bash"],
@@ -215,8 +213,6 @@ serialized_simple_dag_ground_truth = {
                     "retry_delay": 300.0,
                     "max_retry_delay": 600.0,
                     "downstream_task_ids": [],
-                    "_is_empty": False,
-                    "_can_skip_downstream": False,
                     "_operator_extra_links": {"Google Custom": 
"_link_CustomOpLink"},
                     "ui_color": "#fff",
                     "ui_fgcolor": "#000",
@@ -569,6 +565,8 @@ class TestStringifiedDAGs:
             )
             return dag_dict
 
+        expected = copy.deepcopy(expected)
+
         # by roundtripping to json we get a cleaner diff
         # if not doing this, we get false alarms such as "__var" != VAR
         actual = json.loads(json.dumps(sorted_serialized_dag(actual)))
@@ -917,7 +915,7 @@ class TestStringifiedDAGs:
         expected_timetable,
     ):
         serialized = {
-            "__version": 1,
+            "__version": 2,
             "dag": {
                 "default_args": {"__type": "dict", "__var": {}},
                 "dag_id": "simple_dag",
@@ -933,7 +931,7 @@ class TestStringifiedDAGs:
 
     def test_deserialization_timetable_unregistered(self):
         serialized = {
-            "__version": 1,
+            "__version": 2,
             "dag": {
                 "default_args": {"__type": "dict", "__var": {}},
                 "dag_id": "simple_dag",
@@ -2199,7 +2197,7 @@ class TestStringifiedDAGs:
     def test_params_upgrade(self):
         """When pre-2.2.0 param (i.e. primitive) is deserialized we convert to 
Param"""
         serialized = {
-            "__version": 1,
+            "__version": 2,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -2220,7 +2218,7 @@ class TestStringifiedDAGs:
         This test asserts that the params are still deserialized properly.
         """
         serialized = {
-            "__version": 1,
+            "__version": 2,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -2247,7 +2245,7 @@ class TestStringifiedDAGs:
         test only to ensure that params stored in 2.2.0 can still be parsed 
correctly.
         """
         serialized = {
-            "__version": 1,
+            "__version": 2,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -2264,7 +2262,7 @@ class TestStringifiedDAGs:
 
     def test_params_serialize_default(self):
         serialized = {
-            "__version": 1,
+            "__version": 2,
             "dag": {
                 "dag_id": "simple_dag",
                 "fileloc": "/path/to/file.py",
@@ -2447,9 +2445,7 @@ def test_operator_expand_serde():
     serialized = BaseSerialization.serialize(real_op)
 
     assert serialized["__var"] == {
-        "_is_empty": False,
         "_is_mapped": True,
-        "_can_skip_downstream": False,
         "_task_module": "airflow.providers.standard.operators.bash",
         "task_type": "BashOperator",
         "start_trigger_args": None,
@@ -2509,9 +2505,7 @@ def test_operator_expand_xcomarg_serde():
 
     serialized = BaseSerialization.serialize(mapped)
     assert serialized["__var"] == {
-        "_is_empty": False,
         "_is_mapped": True,
-        "_can_skip_downstream": False,
         "_task_module": "tests_common.test_utils.mock_operators",
         "task_type": "MockOperator",
         "downstream_task_ids": [],
@@ -2568,9 +2562,7 @@ def test_operator_expand_kwargs_literal_serde(strict):
 
     serialized = BaseSerialization.serialize(mapped)
     assert serialized["__var"] == {
-        "_is_empty": False,
         "_is_mapped": True,
-        "_can_skip_downstream": False,
         "_task_module": "tests_common.test_utils.mock_operators",
         "task_type": "MockOperator",
         "downstream_task_ids": [],
@@ -2635,9 +2627,7 @@ def test_operator_expand_kwargs_xcomarg_serde(strict):
 
     serialized = SerializedBaseOperator.serialize(mapped)
     assert serialized["__var"] == {
-        "_is_empty": False,
         "_is_mapped": True,
-        "_can_skip_downstream": False,
         "_task_module": "tests_common.test_utils.mock_operators",
         "task_type": "MockOperator",
         "downstream_task_ids": [],
@@ -2785,9 +2775,7 @@ def test_taskflow_expand_serde():
 
     serialized = BaseSerialization.serialize(original)
     assert serialized["__var"] == {
-        "_is_empty": False,
         "_is_mapped": True,
-        "_can_skip_downstream": False,
         "_task_module": "airflow.providers.standard.decorators.python",
         "task_type": "_PythonDecoratedOperator",
         "_operator_name": "@task",
@@ -2900,9 +2888,7 @@ def test_taskflow_expand_kwargs_serde(strict):
 
     serialized = BaseSerialization.serialize(original)
     assert serialized["__var"] == {
-        "_is_empty": False,
         "_is_mapped": True,
-        "_can_skip_downstream": False,
         "_task_module": "airflow.providers.standard.decorators.python",
         "task_type": "_PythonDecoratedOperator",
         "_operator_name": "@task",
@@ -3065,9 +3051,7 @@ def test_mapped_task_with_operator_extra_links_property():
         "template_fields_renderers": {},
         "task_type": "_DummyOperator",
         "_task_module": "unit.serialization.test_dag_serialization",
-        "_is_empty": False,
         "_is_mapped": True,
-        "_can_skip_downstream": False,
         "start_trigger_args": None,
         "start_from_trigger": False,
     }
@@ -3076,3 +3060,146 @@ def 
test_mapped_task_with_operator_extra_links_property():
     assert deserialized_dag.task_dict["task"].operator_extra_links == [
         XComOperatorLink(name="airflow", xcom_key="_link_AirflowLink2")
     ]
+
+
+def test_handle_v1_serdag():
+    v1 = {
+        "__version": 1,
+        "dag": {
+            "default_args": {
+                "__type": "dict",
+                "__var": {
+                    "depends_on_past": False,
+                    "retries": 1,
+                    "retry_delay": {"__type": "timedelta", "__var": 300.0},
+                    "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
+                    "sla": {"__type": "timedelta", "__var": 100.0},
+                },
+            },
+            "start_date": 1564617600.0,
+            "_task_group": {
+                "_group_id": None,
+                "prefix_group_id": True,
+                "children": {
+                    "bash_task": ("operator", "bash_task"),
+                    "custom_task": ("operator", "custom_task"),
+                },
+                "tooltip": "",
+                "ui_color": "CornflowerBlue",
+                "ui_fgcolor": "#000",
+                "upstream_group_ids": [],
+                "downstream_group_ids": [],
+                "upstream_task_ids": [],
+                "downstream_task_ids": [],
+            },
+            "is_paused_upon_creation": False,
+            "_dag_id": "simple_dag",
+            "doc_md": "### DAG Tutorial Documentation",
+            "fileloc": None,
+            "_processor_dags_folder": (
+                AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "tests" / "unit" / 
"dags"
+            ).as_posix(),
+            "tasks": [
+                {
+                    "__type": "operator",
+                    "__var": {
+                        "task_id": "bash_task",
+                        "retries": 1,
+                        "retry_delay": 300.0,
+                        "max_retry_delay": 600.0,
+                        "sla": 100.0,
+                        "downstream_task_ids": [],
+                        "ui_color": "#f0ede4",
+                        "ui_fgcolor": "#000",
+                        "template_ext": [".sh", ".bash"],
+                        "template_fields": ["bash_command", "env", "cwd"],
+                        "template_fields_renderers": {"bash_command": "bash", 
"env": "json"},
+                        "bash_command": "echo {{ task.task_id }}",
+                        "_task_type": "BashOperator",
+                        # Slightly difference from v2-10-stable here, we 
manually changed this path
+                        "_task_module": 
"airflow.providers.standard.operators.bash",
+                        "pool": "default_pool",
+                        "is_setup": False,
+                        "is_teardown": False,
+                        "on_failure_fail_dagrun": False,
+                        "executor_config": {
+                            "__type": "dict",
+                            "__var": {
+                                "pod_override": {
+                                    "__type": "k8s.V1Pod",
+                                    "__var": 
PodGenerator.serialize_pod(executor_config_pod),
+                                }
+                            },
+                        },
+                        "doc_md": "### Task Tutorial Documentation",
+                        "_log_config_logger_name": "airflow.task.operators",
+                        "_needs_expansion": False,
+                        "weight_rule": "downstream",
+                        "start_trigger_args": None,
+                        "start_from_trigger": False,
+                    },
+                },
+                {
+                    "__type": "operator",
+                    "__var": {
+                        "task_id": "custom_task",
+                        "retries": 1,
+                        "retry_delay": 300.0,
+                        "max_retry_delay": 600.0,
+                        "sla": 100.0,
+                        "downstream_task_ids": [],
+                        "_operator_extra_links": 
[{"tests.test_utils.mock_operators.CustomOpLink": {}}],
+                        "ui_color": "#fff",
+                        "ui_fgcolor": "#000",
+                        "template_ext": [],
+                        "template_fields": ["bash_command"],
+                        "template_fields_renderers": {},
+                        "_task_type": "CustomOperator",
+                        "_operator_name": "@custom",
+                        # Slightly difference from v2-10-stable here, we 
manually changed this path
+                        "_task_module": 
"tests_common.test_utils.mock_operators",
+                        "pool": "default_pool",
+                        "is_setup": False,
+                        "is_teardown": False,
+                        "on_failure_fail_dagrun": False,
+                        "_log_config_logger_name": "airflow.task.operators",
+                        "_needs_expansion": False,
+                        "weight_rule": "downstream",
+                        "start_trigger_args": None,
+                        "start_from_trigger": False,
+                    },
+                },
+            ],
+            "schedule_interval": {"__type": "timedelta", "__var": 86400.0},
+            "timezone": "UTC",
+            "_access_control": {
+                "__type": "dict",
+                "__var": {
+                    "test_role": {
+                        "__type": "dict",
+                        "__var": {
+                            "DAGs": {
+                                "__type": "set",
+                                "__var": [permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT],
+                            }
+                        },
+                    }
+                },
+            },
+            "edge_info": {},
+            "dag_dependencies": [],
+            "params": [],
+        },
+    }
+
+    SerializedDAG.conversion_v1_to_v2(v1)
+
+    # Update a few subtle differences
+    v1["dag"]["tags"] = []
+    v1["dag"]["catchup"] = False
+    v1["dag"]["disable_bundle_versioning"] = False
+
+    expected = copy.deepcopy(serialized_simple_dag_ground_truth)
+    del expected["dag"]["tasks"][1]["__var"]["_operator_extra_links"]
+
+    assert v1 == expected
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py 
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index 2284d388567..b2e3baaeccf 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -357,6 +357,8 @@ class MappedOperator(AbstractOperator):
     def get_serialized_fields(cls):
         # Not using 'cls' here since we only want to serialize base fields.
         return (frozenset(attrs.fields_dict(MappedOperator)) | {"task_type"}) 
- {
+            "_is_empty",
+            "_can_skip_downstream",
             "_task_type",
             "dag",
             "deps",

Reply via email to