This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bd8941dee9d Bump Serialized DAG to v2 and handle conversion from v1
(#49020)
bd8941dee9d is described below
commit bd8941dee9d7efec433c3e4215124e9e03d1e610
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Apr 14 13:12:41 2025 -0700
Bump Serialized DAG to v2 and handle conversion from v1 (#49020)
A number of changes have been made to the serialized DAG structure between
2.x
and 3.0, and with the addition of DAG versioning in 3.0 we can't use our old
approach of simply force-reserializing all DAGs anymore.
So instead we bump the version record (that has been 1 since it was first
introduced in 1.10.7, even though there were changes) and upgrade from the
latest format in Airflow 2.x such that the Webserver will be able to display
things for it.
The scheduler is not so easy to fix, so we use a proxy of "skip things that
haven't been re-processed" yet by making the scheduler ignore DAGs that
don't
a bundle name set (as all dags processed in 3.0 have this set, even if it is
for a local file bundle)
---------
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
.../core_api/datamodels/dag_versions.py | 6 +-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 4 +-
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 10 ++
.../api_fastapi/core_api/services/ui/grid.py | 15 +-
.../src/airflow/jobs/scheduler_job_runner.py | 1 +
.../airflow/serialization/serialized_objects.py | 142 ++++++++++++++++-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 9 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 2 +-
.../unit/serialization/test_dag_serialization.py | 177 ++++++++++++++++++---
.../src/airflow/sdk/definitions/mappedoperator.py | 2 +
10 files changed, 331 insertions(+), 37 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
index fac36b885f3..47a28b75284 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
@@ -31,7 +31,7 @@ class DagVersionResponse(BaseModel):
id: UUID
version_number: int
dag_id: str
- bundle_name: str
+ bundle_name: str | None
bundle_version: str | None
created_at: datetime
@@ -39,7 +39,9 @@ class DagVersionResponse(BaseModel):
@computed_field # type: ignore[misc]
@property
def bundle_url(self) -> str | None:
- return DagBundlesManager().view_url(self.bundle_name,
self.bundle_version)
+ if self.bundle_name:
+ return DagBundlesManager().view_url(self.bundle_name,
self.bundle_version)
+ return None
class DAGVersionCollectionResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index ca8ba635582..2e898e370ac 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -9624,7 +9624,9 @@ components:
type: string
title: Dag Id
bundle_name:
- type: string
+ anyOf:
+ - type: string
+ - type: 'null'
title: Bundle Name
bundle_version:
anyOf:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index c7c34bac7d1..2b64551f982 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -21,6 +21,7 @@ import collections
import itertools
from typing import Annotated
+import structlog
from fastapi import Depends, HTTPException, Request, status
from sqlalchemy import select
from sqlalchemy.orm import joinedload
@@ -58,6 +59,7 @@ from airflow.models.dag_version import DagVersion
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.utils.state import TaskInstanceState
+log = structlog.get_logger(logger_name=__name__)
grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
@@ -171,6 +173,14 @@ def grid_data(
.order_by(DagVersion.id) # ascending cus this is mostly for
pre-3.0 upgrade
.limit(1)
)
+ if not version.serialized_dag:
+ log.error(
+ "No serialized dag found",
+ dag_id=tis[0].dag_id,
+ version_id=version.id,
+ version_number=version.version_number,
+ )
+ continue
run_dag = version.serialized_dag.dag
task_node_map = get_task_group_map(dag=run_dag)
for ti in tis:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index 8ef2b25012c..f35c4172e88 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -22,6 +22,7 @@ from operator import methodcaller
from typing import Callable
from uuid import UUID
+import structlog
from sqlalchemy import select
from typing_extensions import Any
@@ -47,6 +48,8 @@ from airflow.serialization.serialized_objects import
SerializedDAG
from airflow.utils.state import TaskInstanceState
from airflow.utils.task_group import task_group_to_dict
+log = structlog.get_logger(logger_name=__name__)
+
@cache
def get_task_group_children_getter() -> Callable:
@@ -275,6 +278,13 @@ def _get_serdag(ti, session):
)
if not dag_version:
raise RuntimeError("No dag_version object could be found.")
+ if not dag_version.serialized_dag:
+ log.error(
+ "No serialized dag found",
+ dag_id=dag_version.dag_id,
+ version_id=dag_version.id,
+ version_number=dag_version.version_number,
+ )
return dag_version.serialized_dag
@@ -283,7 +293,10 @@ def get_combined_structure(task_instances, session):
merged_nodes = []
# we dedup with serdag, as serdag.dag varies somehow?
serdags = {_get_serdag(ti, session) for ti in task_instances}
- dags = [serdag.dag for serdag in serdags]
+ dags = []
+ for serdag in serdags:
+ if serdag:
+ dags.append(serdag.dag)
for dag in dags:
nodes = [task_group_to_dict(child) for child in
dag.task_group.topological_sort()]
_merge_node_dicts(merged_nodes, nodes)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index c95303a480c..b5535ea44a6 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -356,6 +356,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
.join(TI.dag_model)
.where(~DM.is_paused)
.where(TI.state == TaskInstanceState.SCHEDULED)
+ .where(DM.bundle_name.is_not(None))
.options(selectinload(TI.dag_model))
.order_by(-TI.priority_weight, DR.logical_date, TI.map_index)
)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 6bda483373b..2340c171970 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -582,7 +582,7 @@ class BaseSerialization:
_CONSTRUCTOR_PARAMS: dict[str, Parameter] = {}
- SERIALIZER_VERSION = 1
+ SERIALIZER_VERSION = 2
@classmethod
def to_json(cls, var: DAG | BaseOperator | dict | list | set | tuple) ->
str:
@@ -1137,7 +1137,6 @@ class DependencyDetector:
"""Detect dependencies set directly on the DAG object."""
if not dag:
return
-
yield from
dag.timetable.asset_condition.iter_dag_dependencies(source="",
target=dag.dag_id)
@@ -1298,10 +1297,12 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
serialize_op["_operator_name"] = op.operator_name
# Used to determine if an Operator is inherited from EmptyOperator
- serialize_op["_is_empty"] = op.inherits_from_empty_operator
+ if op.inherits_from_empty_operator:
+ serialize_op["_is_empty"] = True
# Used to determine if an Operator is inherited from SkipMixin or
BranchMixin
- serialize_op["_can_skip_downstream"] = op.inherits_from_skipmixin
+ if op.inherits_from_skipmixin:
+ serialize_op["_can_skip_downstream"] = True
serialize_op["start_trigger_args"] = (
encode_start_trigger_args(op.start_trigger_args) if
op.start_trigger_args else None
@@ -1789,12 +1790,140 @@ class SerializedDAG(DAG, BaseSerialization):
cls.validate_schema(json_dict)
return json_dict
+ @classmethod
+ def conversion_v1_to_v2(cls, ser_obj: dict):
+ dag_dict = ser_obj["dag"]
+ dag_renames = [
+ ("_dag_id", "dag_id"),
+ ("_task_group", "task_group"),
+ ("_access_control", "access_control"),
+ ]
+ task_renames = [("_task_type", "task_type")]
+ #
+ tasks_remove = [
+ "_log_config_logger_name",
+ "deps",
+ "sla",
+ # Operator extra links from Airflow 2 won't work anymore, only new
ones, so remove these
+ "_operator_extra_links",
+ ]
+
+ ser_obj["__version"] = 2
+
+ def replace_dataset_in_str(s):
+ return s.replace("Dataset", "Asset").replace("dataset", "asset")
+
+ def _replace_dataset_with_asset_in_timetables(obj, parent_key=None):
+ if isinstance(obj, dict):
+ new_obj = {}
+ for k, v in obj.items():
+ new_key = replace_dataset_in_str(k) if isinstance(k, str)
else k
+ # Don't replace uri values
+ if new_key == "uri":
+ new_obj[new_key] = v
+ else:
+ new_value = (
+ replace_dataset_in_str(v)
+ if isinstance(v, str)
+ else _replace_dataset_with_asset_in_timetables(v,
parent_key=new_key)
+ )
+ new_obj[new_key] = new_value
+ # Insert "name" and "group" if this is inside the 'objects'
list
+ if parent_key == "objects":
+ new_obj["name"] = None
+ new_obj["group"] = None
+ return new_obj
+
+ elif isinstance(obj, list):
+ return [_replace_dataset_with_asset_in_timetables(i,
parent_key=parent_key) for i in obj]
+
+ return obj
+
+ for old, new in dag_renames:
+ dag_dict[new] = dag_dict.pop(old)
+
+ if default_args := dag_dict.get("default_args"):
+ for k in tasks_remove:
+ default_args["__var"].pop(k, None)
+
+ if sched := dag_dict.pop("schedule_interval", None):
+ if sched is None:
+ dag_dict["timetable"] = {
+ "__var": {},
+ "__type": "airflow.timetables.simple.NullTimetable",
+ }
+ elif isinstance(sched, str):
+ # "@daily" etc
+ if sched == "@once":
+ dag_dict["timetable"] = {
+ "__var": {},
+ "__type": "airflow.timetables.simple.OnceTimetable",
+ }
+ elif sched == "@continuous":
+ dag_dict["timetable"] = {
+ "__var": {},
+ "__type":
"airflow.timetables.simple.ContinuousTimetable",
+ }
+ elif sched == "@daily":
+ dag_dict["timetable"] = {
+ "__var": {
+ "interval": 0.0,
+ "timezone": "UTC",
+ "expression": "0 0 * * *",
+ "run_immediately": False,
+ },
+ "__type":
"airflow.timetables.trigger.CronTriggerTimetable",
+ }
+ else:
+ # We should maybe convert this to None and warn instead
+ raise ValueError(f"Unknown schedule_interval field
{sched!r}")
+ elif sched.get("__type") == "timedelta":
+ dag_dict["timetable"] = {
+ "__type":
"airflow.timetables.interval.DeltaDataIntervalTimetable",
+ "__var": {"delta": sched["__var"]},
+ }
+ elif timetable := dag_dict.get("timetable"):
+ if timetable["__type"] in {
+ "airflow.timetables.simple.DatasetTriggeredTimetable",
+ "airflow.timetables.datasets.DatasetOrTimeSchedule",
+ }:
+ dag_dict["timetable"] =
_replace_dataset_with_asset_in_timetables(dag_dict["timetable"])
+
+ if "dag_dependencies" in dag_dict:
+ for dep in dag_dict["dag_dependencies"]:
+ for fld in ("dependency_type", "target", "source"):
+ if dep.get(fld) == "dataset":
+ dep[fld] = "asset"
+
+ for task in dag_dict["tasks"]:
+ task_var: dict = task["__var"]
+ if "airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep"
in task_var.get("deps", []):
+ task_var["_is_sensor"] = True
+ for k in tasks_remove:
+ task_var.pop(k, None)
+ for old, new in task_renames:
+ task_var[new] = task_var.pop(old)
+ for item in task_var.get("outlets", []):
+ if isinstance(item, dict) and "__type" in item:
+ item["__type"] = replace_dataset_in_str(item["__type"])
+ for item in task_var.get("inlets", []):
+ if isinstance(item, dict) and "__type" in item:
+ item["__type"] = replace_dataset_in_str(item["__type"])
+ var_ = item["__var"]
+ var_["name"] = None
+ var_["group"] = None
+
+ # Set on the root TG
+ dag_dict["task_group"]["group_display_name"] = ""
+
@classmethod
def from_dict(cls, serialized_obj: dict) -> SerializedDAG:
"""Deserializes a python dict in to the DAG and operators it
contains."""
ver = serialized_obj.get("__version", "<not present>")
- if ver != cls.SERIALIZER_VERSION:
+ if ver not in (1, 2):
raise ValueError(f"Unsure how to deserialize version {ver!r}")
+ if ver == 1:
+ cls.conversion_v1_to_v2(serialized_obj)
return cls.deserialize_dag(serialized_obj["dag"])
@@ -1848,8 +1977,9 @@ class TaskGroupSerialization(BaseSerialization):
group_id = cls.deserialize(encoded_group["_group_id"])
kwargs = {
key: cls.deserialize(encoded_group[key])
- for key in ["prefix_group_id", "tooltip", "ui_color",
"ui_fgcolor", "group_display_name"]
+ for key in ["prefix_group_id", "tooltip", "ui_color", "ui_fgcolor"]
}
+ kwargs["group_display_name"] =
cls.deserialize(encoded_group.get("group_display_name", ""))
if not encoded_group.get("is_mapped"):
group = TaskGroup(group_id=group_id, parent_group=parent_group,
dag=dag, **kwargs)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 205e851a9f4..5e1598a153a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3407,7 +3407,14 @@ export const $DagVersionResponse = {
title: "Dag Id",
},
bundle_name: {
- type: "string",
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
title: "Bundle Name",
},
bundle_version: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 72707105751..834816afa63 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -908,7 +908,7 @@ export type DagVersionResponse = {
id: string;
version_number: number;
dag_id: string;
- bundle_name: string;
+ bundle_name: string | null;
bundle_version: string | null;
created_at: string;
readonly bundle_url: string | null;
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index f70cd7e5338..fb2f3d8f1f7 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -122,7 +122,7 @@ executor_config_pod = k8s.V1Pod(
TYPE = Encoding.TYPE
VAR = Encoding.VAR
serialized_simple_dag_ground_truth = {
- "__version": 1,
+ "__version": 2,
"dag": {
"default_args": {
"__type": "dict",
@@ -174,8 +174,6 @@ serialized_simple_dag_ground_truth = {
"retry_delay": 300.0,
"max_retry_delay": 600.0,
"downstream_task_ids": [],
- "_is_empty": False,
- "_can_skip_downstream": False,
"ui_color": "#f0ede4",
"ui_fgcolor": "#000",
"template_ext": [".sh", ".bash"],
@@ -215,8 +213,6 @@ serialized_simple_dag_ground_truth = {
"retry_delay": 300.0,
"max_retry_delay": 600.0,
"downstream_task_ids": [],
- "_is_empty": False,
- "_can_skip_downstream": False,
"_operator_extra_links": {"Google Custom":
"_link_CustomOpLink"},
"ui_color": "#fff",
"ui_fgcolor": "#000",
@@ -569,6 +565,8 @@ class TestStringifiedDAGs:
)
return dag_dict
+ expected = copy.deepcopy(expected)
+
# by roundtripping to json we get a cleaner diff
# if not doing this, we get false alarms such as "__var" != VAR
actual = json.loads(json.dumps(sorted_serialized_dag(actual)))
@@ -917,7 +915,7 @@ class TestStringifiedDAGs:
expected_timetable,
):
serialized = {
- "__version": 1,
+ "__version": 2,
"dag": {
"default_args": {"__type": "dict", "__var": {}},
"dag_id": "simple_dag",
@@ -933,7 +931,7 @@ class TestStringifiedDAGs:
def test_deserialization_timetable_unregistered(self):
serialized = {
- "__version": 1,
+ "__version": 2,
"dag": {
"default_args": {"__type": "dict", "__var": {}},
"dag_id": "simple_dag",
@@ -2199,7 +2197,7 @@ class TestStringifiedDAGs:
def test_params_upgrade(self):
"""When pre-2.2.0 param (i.e. primitive) is deserialized we convert to
Param"""
serialized = {
- "__version": 1,
+ "__version": 2,
"dag": {
"dag_id": "simple_dag",
"fileloc": "/path/to/file.py",
@@ -2220,7 +2218,7 @@ class TestStringifiedDAGs:
This test asserts that the params are still deserialized properly.
"""
serialized = {
- "__version": 1,
+ "__version": 2,
"dag": {
"dag_id": "simple_dag",
"fileloc": "/path/to/file.py",
@@ -2247,7 +2245,7 @@ class TestStringifiedDAGs:
test only to ensure that params stored in 2.2.0 can still be parsed
correctly.
"""
serialized = {
- "__version": 1,
+ "__version": 2,
"dag": {
"dag_id": "simple_dag",
"fileloc": "/path/to/file.py",
@@ -2264,7 +2262,7 @@ class TestStringifiedDAGs:
def test_params_serialize_default(self):
serialized = {
- "__version": 1,
+ "__version": 2,
"dag": {
"dag_id": "simple_dag",
"fileloc": "/path/to/file.py",
@@ -2447,9 +2445,7 @@ def test_operator_expand_serde():
serialized = BaseSerialization.serialize(real_op)
assert serialized["__var"] == {
- "_is_empty": False,
"_is_mapped": True,
- "_can_skip_downstream": False,
"_task_module": "airflow.providers.standard.operators.bash",
"task_type": "BashOperator",
"start_trigger_args": None,
@@ -2509,9 +2505,7 @@ def test_operator_expand_xcomarg_serde():
serialized = BaseSerialization.serialize(mapped)
assert serialized["__var"] == {
- "_is_empty": False,
"_is_mapped": True,
- "_can_skip_downstream": False,
"_task_module": "tests_common.test_utils.mock_operators",
"task_type": "MockOperator",
"downstream_task_ids": [],
@@ -2568,9 +2562,7 @@ def test_operator_expand_kwargs_literal_serde(strict):
serialized = BaseSerialization.serialize(mapped)
assert serialized["__var"] == {
- "_is_empty": False,
"_is_mapped": True,
- "_can_skip_downstream": False,
"_task_module": "tests_common.test_utils.mock_operators",
"task_type": "MockOperator",
"downstream_task_ids": [],
@@ -2635,9 +2627,7 @@ def test_operator_expand_kwargs_xcomarg_serde(strict):
serialized = SerializedBaseOperator.serialize(mapped)
assert serialized["__var"] == {
- "_is_empty": False,
"_is_mapped": True,
- "_can_skip_downstream": False,
"_task_module": "tests_common.test_utils.mock_operators",
"task_type": "MockOperator",
"downstream_task_ids": [],
@@ -2785,9 +2775,7 @@ def test_taskflow_expand_serde():
serialized = BaseSerialization.serialize(original)
assert serialized["__var"] == {
- "_is_empty": False,
"_is_mapped": True,
- "_can_skip_downstream": False,
"_task_module": "airflow.providers.standard.decorators.python",
"task_type": "_PythonDecoratedOperator",
"_operator_name": "@task",
@@ -2900,9 +2888,7 @@ def test_taskflow_expand_kwargs_serde(strict):
serialized = BaseSerialization.serialize(original)
assert serialized["__var"] == {
- "_is_empty": False,
"_is_mapped": True,
- "_can_skip_downstream": False,
"_task_module": "airflow.providers.standard.decorators.python",
"task_type": "_PythonDecoratedOperator",
"_operator_name": "@task",
@@ -3065,9 +3051,7 @@ def test_mapped_task_with_operator_extra_links_property():
"template_fields_renderers": {},
"task_type": "_DummyOperator",
"_task_module": "unit.serialization.test_dag_serialization",
- "_is_empty": False,
"_is_mapped": True,
- "_can_skip_downstream": False,
"start_trigger_args": None,
"start_from_trigger": False,
}
@@ -3076,3 +3060,146 @@ def
test_mapped_task_with_operator_extra_links_property():
assert deserialized_dag.task_dict["task"].operator_extra_links == [
XComOperatorLink(name="airflow", xcom_key="_link_AirflowLink2")
]
+
+
+def test_handle_v1_serdag():
+ v1 = {
+ "__version": 1,
+ "dag": {
+ "default_args": {
+ "__type": "dict",
+ "__var": {
+ "depends_on_past": False,
+ "retries": 1,
+ "retry_delay": {"__type": "timedelta", "__var": 300.0},
+ "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
+ "sla": {"__type": "timedelta", "__var": 100.0},
+ },
+ },
+ "start_date": 1564617600.0,
+ "_task_group": {
+ "_group_id": None,
+ "prefix_group_id": True,
+ "children": {
+ "bash_task": ("operator", "bash_task"),
+ "custom_task": ("operator", "custom_task"),
+ },
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "upstream_group_ids": [],
+ "downstream_group_ids": [],
+ "upstream_task_ids": [],
+ "downstream_task_ids": [],
+ },
+ "is_paused_upon_creation": False,
+ "_dag_id": "simple_dag",
+ "doc_md": "### DAG Tutorial Documentation",
+ "fileloc": None,
+ "_processor_dags_folder": (
+ AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "tests" / "unit" /
"dags"
+ ).as_posix(),
+ "tasks": [
+ {
+ "__type": "operator",
+ "__var": {
+ "task_id": "bash_task",
+ "retries": 1,
+ "retry_delay": 300.0,
+ "max_retry_delay": 600.0,
+ "sla": 100.0,
+ "downstream_task_ids": [],
+ "ui_color": "#f0ede4",
+ "ui_fgcolor": "#000",
+ "template_ext": [".sh", ".bash"],
+ "template_fields": ["bash_command", "env", "cwd"],
+ "template_fields_renderers": {"bash_command": "bash",
"env": "json"},
+ "bash_command": "echo {{ task.task_id }}",
+ "_task_type": "BashOperator",
+ # Slightly difference from v2-10-stable here, we
manually changed this path
+ "_task_module":
"airflow.providers.standard.operators.bash",
+ "pool": "default_pool",
+ "is_setup": False,
+ "is_teardown": False,
+ "on_failure_fail_dagrun": False,
+ "executor_config": {
+ "__type": "dict",
+ "__var": {
+ "pod_override": {
+ "__type": "k8s.V1Pod",
+ "__var":
PodGenerator.serialize_pod(executor_config_pod),
+ }
+ },
+ },
+ "doc_md": "### Task Tutorial Documentation",
+ "_log_config_logger_name": "airflow.task.operators",
+ "_needs_expansion": False,
+ "weight_rule": "downstream",
+ "start_trigger_args": None,
+ "start_from_trigger": False,
+ },
+ },
+ {
+ "__type": "operator",
+ "__var": {
+ "task_id": "custom_task",
+ "retries": 1,
+ "retry_delay": 300.0,
+ "max_retry_delay": 600.0,
+ "sla": 100.0,
+ "downstream_task_ids": [],
+ "_operator_extra_links":
[{"tests.test_utils.mock_operators.CustomOpLink": {}}],
+ "ui_color": "#fff",
+ "ui_fgcolor": "#000",
+ "template_ext": [],
+ "template_fields": ["bash_command"],
+ "template_fields_renderers": {},
+ "_task_type": "CustomOperator",
+ "_operator_name": "@custom",
+ # Slightly difference from v2-10-stable here, we
manually changed this path
+ "_task_module":
"tests_common.test_utils.mock_operators",
+ "pool": "default_pool",
+ "is_setup": False,
+ "is_teardown": False,
+ "on_failure_fail_dagrun": False,
+ "_log_config_logger_name": "airflow.task.operators",
+ "_needs_expansion": False,
+ "weight_rule": "downstream",
+ "start_trigger_args": None,
+ "start_from_trigger": False,
+ },
+ },
+ ],
+ "schedule_interval": {"__type": "timedelta", "__var": 86400.0},
+ "timezone": "UTC",
+ "_access_control": {
+ "__type": "dict",
+ "__var": {
+ "test_role": {
+ "__type": "dict",
+ "__var": {
+ "DAGs": {
+ "__type": "set",
+ "__var": [permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT],
+ }
+ },
+ }
+ },
+ },
+ "edge_info": {},
+ "dag_dependencies": [],
+ "params": [],
+ },
+ }
+
+ SerializedDAG.conversion_v1_to_v2(v1)
+
+ # Update a few subtle differences
+ v1["dag"]["tags"] = []
+ v1["dag"]["catchup"] = False
+ v1["dag"]["disable_bundle_versioning"] = False
+
+ expected = copy.deepcopy(serialized_simple_dag_ground_truth)
+ del expected["dag"]["tasks"][1]["__var"]["_operator_extra_links"]
+
+ assert v1 == expected
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index 2284d388567..b2e3baaeccf 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -357,6 +357,8 @@ class MappedOperator(AbstractOperator):
def get_serialized_fields(cls):
# Not using 'cls' here since we only want to serialize base fields.
return (frozenset(attrs.fields_dict(MappedOperator)) | {"task_type"})
- {
+ "_is_empty",
+ "_can_skip_downstream",
"_task_type",
"dag",
"deps",