This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4da93d0b60a fix(serialized_objects): handle both inlet and outlet in
v1 to v2 convert (#49286)
4da93d0b60a is described below
commit 4da93d0b60a1adf310ec06929d6676360a639af7
Author: Wei Lee <[email protected]>
AuthorDate: Tue Apr 15 21:13:34 2025 +0800
fix(serialized_objects): handle both inlet and outlet in v1 to v2 convert
(#49286)
* fix(serialized_objects): handle both inlet and outlet in v1 to v2 convert
* fix-migration
---------
Co-authored-by: vatsrahul1001 <[email protected]>
---
.../airflow/serialization/serialized_objects.py | 17 ++---
.../unit/serialization/test_dag_serialization.py | 72 +++++++++++++++++++++-
2 files changed, 78 insertions(+), 11 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 2340c171970..8dd2b9294d2 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1840,7 +1840,8 @@ class SerializedDAG(DAG, BaseSerialization):
return obj
for old, new in dag_renames:
- dag_dict[new] = dag_dict.pop(old)
+ if old in dag_dict:
+ dag_dict[new] = dag_dict.pop(old)
if default_args := dag_dict.get("default_args"):
for k in tasks_remove:
@@ -1903,15 +1904,15 @@ class SerializedDAG(DAG, BaseSerialization):
task_var.pop(k, None)
for old, new in task_renames:
task_var[new] = task_var.pop(old)
- for item in task_var.get("outlets", []):
+ for item in itertools.chain(*(task_var.get(key, []) for key in
("inlets", "outlets"))):
+ original_item_type = item["__type"]
if isinstance(item, dict) and "__type" in item:
- item["__type"] = replace_dataset_in_str(item["__type"])
- for item in task_var.get("inlets", []):
- if isinstance(item, dict) and "__type" in item:
- item["__type"] = replace_dataset_in_str(item["__type"])
+ item["__type"] = replace_dataset_in_str(original_item_type)
+
var_ = item["__var"]
- var_["name"] = None
- var_["group"] = None
+ if original_item_type == "dataset":
+ var_["name"] = var_["uri"]
+ var_["group"] = "asset"
# Set on the root TG
dag_dict["task_group"]["group_display_name"] = ""
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index fb2f3d8f1f7..d3e605063d8 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -64,7 +64,7 @@ from airflow.models.xcom import XComModel
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
-from airflow.sdk import teardown
+from airflow.sdk import AssetAlias, teardown
from airflow.sdk.bases.decorator import DecoratedOperator
from airflow.sdk.definitions._internal.expandinput import EXPAND_INPUT_EMPTY
from airflow.sdk.definitions.asset import Asset, AssetUniqueKey
@@ -203,6 +203,32 @@ serialized_simple_dag_ground_truth = {
"weight_rule": "downstream",
"start_trigger_args": None,
"start_from_trigger": False,
+ "inlets": [
+ {
+ "__type": "asset",
+ "__var": {
+ "extra": {},
+ "group": "asset",
+ "name": "asset-1",
+ "uri": "asset-1",
+ },
+ },
+ {
+ "__type": "asset_alias",
+ "__var": {"group": "asset", "name": "alias-name"},
+ },
+ ],
+ "outlets": [
+ {
+ "__type": "asset",
+ "__var": {
+ "extra": {},
+ "group": "asset",
+ "name": "asset-2",
+ "uri": "asset-2",
+ },
+ },
+ ],
},
},
{
@@ -252,7 +278,15 @@ serialized_simple_dag_ground_truth = {
},
},
"edge_info": {},
- "dag_dependencies": [],
+ "dag_dependencies": [
+ {
+ "dependency_id": '{"name": "asset-2", "uri": "asset-2"}',
+ "dependency_type": "asset",
+ "label": "asset-2",
+ "source": "simple_dag",
+ "target": "asset",
+ },
+ ],
"params": [],
"tags": [],
},
@@ -301,6 +335,8 @@ def make_simple_dag():
owner="airflow",
executor_config={"pod_override": executor_config_pod},
doc_md="### Task Tutorial Documentation",
+ inlets=[Asset("asset-1"), AssetAlias(name="alias-name")],
+ outlets=Asset("asset-2"),
)
return dag
@@ -3137,6 +3173,28 @@ def test_handle_v1_serdag():
"weight_rule": "downstream",
"start_trigger_args": None,
"start_from_trigger": False,
+ "inlets": [
+ {
+ "__type": "dataset",
+ "__var": {
+ "extra": {},
+ "uri": "asset-1",
+ },
+ },
+ {
+ "__type": "dataset_alias",
+ "__var": {"name": "alias-name"},
+ },
+ ],
+ "outlets": [
+ {
+ "__type": "dataset",
+ "__var": {
+ "extra": {},
+ "uri": "asset-2",
+ },
+ },
+ ],
},
},
{
@@ -3187,7 +3245,15 @@ def test_handle_v1_serdag():
},
},
"edge_info": {},
- "dag_dependencies": [],
+ "dag_dependencies": [
+ {
+ "dependency_id": '{"name": "asset-2", "uri": "asset-2"}',
+ "dependency_type": "asset",
+ "label": "asset-2",
+ "source": "simple_dag",
+ "target": "asset",
+ },
+ ],
"params": [],
},
}