This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new a5faed920af fix dag version inflation caused by unmatched serialized
result of task using reserialized command (#61077) (#66861)
a5faed920af is described below
commit a5faed920af8917710bcc15d6797188593450bb7
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 12:26:50 2026 +0530
fix dag version inflation caused by unmatched serialized result of task
using reserialized command (#61077) (#66861)
Closes: #60868
(cherry picked from commit c4252ba97b2b00bd7e92f0ad4a8dcfdd336ed849)
Co-authored-by: Jeongwoo Do <[email protected]>
---
airflow-core/src/airflow/models/serialized_dag.py | 2 +-
.../tests/unit/cli/commands/test_dag_command.py | 30 ++++++++++++++++++
.../tests/unit/dags/test_dag_reserialize.py | 36 ++++++++++++++++++++++
3 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index 8323c2228ff..be55c3e0908 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -383,7 +383,7 @@ class SerializedDagModel(Base):
"""Recursively sort json_dict and its nested dictionaries and lists."""
if isinstance(serialized_dag, dict):
return {k: cls._sort_serialized_dag_dict(v) for k, v in
sorted(serialized_dag.items())}
- if isinstance(serialized_dag, list):
+ if isinstance(serialized_dag, (list, tuple)):
if all(isinstance(i, dict) for i in serialized_dag):
if all(
isinstance(i.get("__var", {}), Iterable) and "task_id" in
i.get("__var", {})
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 208fd9bb074..cb59144f107 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -25,6 +25,7 @@ from datetime import datetime, timedelta
from unittest import mock
from unittest.mock import MagicMock
+import msgspec
import pendulum
import pytest
import time_machine
@@ -35,6 +36,7 @@ from airflow._shared.timezones import timezone
from airflow.cli import cli_parser
from airflow.cli.commands import dag_command
from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
+from airflow.dag_processing.processor import DagFileParsingResult,
DagFileProcessorProcess
from airflow.exceptions import AirflowException
from airflow.models import DagModel, DagRun
from airflow.models.dagbag import DBDagBag
@@ -42,6 +44,8 @@ from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.standard.triggers.temporal import DateTimeTrigger,
TimeDeltaTrigger
from airflow.sdk import BaseOperator, task
from airflow.sdk.definitions.dag import _run_inline_trigger
+from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame
+from airflow.serialization.serialized_objects import DagSerialization,
LazyDeserializedDAG
from airflow.triggers.base import TriggerEvent
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
@@ -1070,3 +1074,29 @@ class TestCliDagsReserialize:
serialized_dag_ids =
set(session.execute(select(SerializedDagModel.dag_id)).scalars())
assert serialized_dag_ids == {"test_example_bash_operator",
"test_sensor"}
+
+ @conf_vars({("core", "load_examples"): "false"})
+ def test_reserialize_should_make_equal_hash_with_dag_processor(self,
configure_dag_bundles, session):
+ bundles = {"bundle_reserialize": TEST_DAGS_FOLDER /
"test_dag_reserialize.py"}
+ with configure_dag_bundles(bundles):
+ dag_command.dag_reserialize(
+ self.parser.parse_args(["dags", "reserialize",
"--bundle-name", "bundle_reserialize"])
+ )
+
+ dagbag = DagBag(bundles["bundle_reserialize"],
bundle_path=bundles["bundle_reserialize"])
+ dag_parsing_result = DagFileParsingResult(
+ fileloc=bundles["bundle_reserialize"].name,
+ serialized_dags=[
+ LazyDeserializedDAG(data=DagSerialization.to_dict(dag)) for
dag in dagbag.dags.values()
+ ],
+ )
+
+ frame = _ResponseFrame(id=0,
body=dag_parsing_result.model_dump()).as_bytes()
+ request_frame =
msgspec.msgpack.Decoder[_RequestFrame](_RequestFrame).decode(frame[4:])
+ dag_processor_parsing_result =
DagFileProcessorProcess.decoder.validate_python(request_frame.body)
+
+ serialized_dag_hash =
list(session.execute(select(SerializedDagModel.dag_hash)).scalars())
+
+ assert len(dag_processor_parsing_result.serialized_dags) == 1
+ assert len(serialized_dag_hash) == 1
+ assert dag_processor_parsing_result.serialized_dags[0].hash ==
serialized_dag_hash[0]
diff --git a/airflow-core/tests/unit/dags/test_dag_reserialize.py
b/airflow-core/tests/unit/dags/test_dag_reserialize.py
new file mode 100644
index 00000000000..c9eba5ca3e1
--- /dev/null
+++ b/airflow-core/tests/unit/dags/test_dag_reserialize.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk import DAG
+
+
+def empty_task():
+ pass
+
+
+with DAG(
+ "test_dag_reserialize",
+ start_date=datetime(2026, 1, 20),
+ schedule="* * * * *",
+ catchup=False,
+ max_active_runs=1,
+) as dag:
+ task_b = PythonOperator(task_id="bear", python_callable=empty_task)