This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 76eb2a09002 Fix max_active_runs lost during DAG serialisation when 
value equals schema default (#65310)
76eb2a09002 is described below

commit 76eb2a0900239c0df30a5e286c113b88cf60af7c
Author: Selman <[email protected]>
AuthorDate: Mon May 18 12:29:17 2026 +0300

    Fix max_active_runs lost during DAG serialisation when value equals schema 
default (#65310)
    
    * Fix max_active_runs lost during DAG serialisation when value equals 
schema default
    
    The serialisation optimisation from #55849 strips DAG fields that match
    their schema.json default. For max_active_runs, max_active_tasks, and
    max_consecutive_failed_dag_runs this is wrong because their runtime
    defaults come from airflow.cfg, not the schema. When a user explicitly
    sets max_active_runs=16 and the config has max_active_runs_per_dag=1,
    the value gets stripped and the dag table ends up with 1.
    
    Skip the schema-default exclusion for these three config-driven fields
    so they always survive serialisation.
    
    * fix: introduce DAG_DEFAULTS
    
    * chore: add test for catchup
    
    * fix: revert comment change, that wording was better
    
    * fix: preserve DAG fields whose defaults match the schema default
    
    * fix: update schema default check script
    
    * fix: address review comments on serialisation tests
    
    * fix: add disable_bundle_versioning case to parametrised serialisation test
    
    * fix: improve docstring for config-driven fields serialisation test
    
    * fix: use tuple for pytest.mark.parametrize first argument
---
 airflow-core/src/airflow/serialization/schema.json | 10 +--
 .../tests/unit/dag_processing/test_collection.py   | 18 +++++
 .../unit/serialization/test_dag_serialization.py   | 93 +++++++++++++++++++++-
 scripts/in_container/run_schema_defaults_check.py  | 11 ++-
 4 files changed, 122 insertions(+), 10 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/schema.json 
b/airflow-core/src/airflow/serialization/schema.json
index bed7b7e9132..e2eed749c94 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -175,7 +175,7 @@
             "value": { "$ref": "#/definitions/dict" }
           }
         },
-        "catchup": { "type": "boolean", "default": false },
+        "catchup": { "type": "boolean" },
         "allowed_run_types": {
           "anyOf": [
             { "type": "array", "items": { "type": "string" } },
@@ -204,9 +204,9 @@
             ]
         },
         "_concurrency": { "type" : "number"},
-        "max_active_tasks": { "type" : "number", "default": 16},
-        "max_active_runs": { "type" : "number", "default": 16},
-        "max_consecutive_failed_dag_runs": { "type" : "number", "default": 0},
+        "max_active_tasks": { "type" : "number" },
+        "max_active_runs": { "type" : "number" },
+        "max_consecutive_failed_dag_runs": { "type" : "number" },
         "default_args": { "$ref": "#/definitions/dict" },
         "start_date": { "$ref": "#/definitions/datetime" },
         "end_date": { "$ref": "#/definitions/datetime" },
@@ -224,7 +224,7 @@
         ]},
         "edge_info": { "$ref": "#/definitions/edge_info" },
         "dag_dependencies": { "$ref": "#/definitions/dag_dependencies" },
-        "disable_bundle_versioning": {"type":  "boolean", "default": false }
+        "disable_bundle_versioning": {"type":  "boolean" }
       },
       "required": [
         "dag_id",
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 42be7792381..40224253961 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -1137,6 +1137,24 @@ class TestUpdateDagParsingResults:
         orm_dag = session.get(DagModel, "dag_max_runs")
         assert orm_dag.max_active_runs == 3
 
+    @pytest.mark.parametrize(
+        ("field", "cfg_key", "schema_default"),
+        [
+            ("max_active_runs", "max_active_runs_per_dag", 16),
+            ("max_active_tasks", "max_active_tasks_per_dag", 16),
+            ("max_consecutive_failed_dag_runs", 
"max_consecutive_failed_dag_runs_per_dag", 0),
+        ],
+    )
+    def 
test_config_driven_field_equal_to_schema_default_not_overridden_by_conf(
+        self, testing_dag_bundle, session, dag_maker, field, cfg_key, 
schema_default
+    ):
+        with conf_vars({("core", cfg_key): "1"}):
+            with dag_maker(f"dag_{field}_schema_default", schedule=None, 
**{field: schema_default}) as dag:
+                ...
+            update_dag_parsing_results_in_db("testing", None, [dag], {}, 0.1, 
set(), session)
+            orm_dag = session.get(DagModel, f"dag_{field}_schema_default")
+            assert getattr(orm_dag, field) == schema_default
+
     def test_max_active_runs_defaults_from_conf_when_none(self, 
testing_dag_bundle, session, dag_maker):
         with conf_vars({("core", "max_active_runs_per_dag"): "4"}):
             with dag_maker("dag_max_runs_default", schedule=None) as dag:
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 8d8cb113c2e..2b6d8a2edd2 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -335,6 +335,13 @@ serialized_simple_dag_ground_truth = {
             },
         ],
         "params": [],
+        # These fields have no schema default; they are always emitted on the 
wire
+        # because their real default comes from airflow.cfg at parse time.
+        "catchup": False,
+        "disable_bundle_versioning": False,
+        "max_active_runs": 16,
+        "max_active_tasks": 16,
+        "max_consecutive_failed_dag_runs": 0,
     },
 }
 
@@ -3844,13 +3851,14 @@ def test_dag_schema_defaults_optimization():
     dag_with_defaults = DAG(
         dag_id="test_defaults_dag",
         start_date=datetime(2023, 1, 1),
-        # These should match schema defaults and be excluded
-        catchup=False,
+        # These match remaining schema defaults and should be excluded
         fail_fast=False,
+        render_template_as_native_obj=False,
+        # These are config-driven: no schema default, always emitted on the 
wire
+        catchup=False,
         max_active_runs=16,
         max_active_tasks=16,
         max_consecutive_failed_dag_runs=0,
-        render_template_as_native_obj=False,
         disable_bundle_versioning=False,
         # These should be excluded as None
         description=None,
@@ -3865,6 +3873,16 @@ def test_dag_schema_defaults_optimization():
     for field in DagSerialization.get_schema_defaults("dag").keys():
         assert field not in dag_data, f"Schema default field '{field}' should 
be excluded"
 
+    # Config-driven fields have no schema default and are always present on 
the wire
+    for field in (
+        "catchup",
+        "max_active_runs",
+        "max_active_tasks",
+        "max_consecutive_failed_dag_runs",
+        "disable_bundle_versioning",
+    ):
+        assert field in dag_data, f"Config-driven field '{field}' must always 
be serialised"
+
     # None fields should also be excluded
     none_fields = ["description", "doc_md"]
     for field in none_fields:
@@ -3873,7 +3891,8 @@ def test_dag_schema_defaults_optimization():
     # Test deserialization restores defaults correctly
     deserialized_dag = DagSerialization.from_dict(serialized)
 
-    # Verify schema defaults are restored
+    # Verify values round-trip correctly: schema-default fields are restored 
from the schema,
+    # config-driven fields are read directly from the wire.
     assert deserialized_dag.catchup is False
     assert deserialized_dag.fail_fast is False
     assert deserialized_dag.max_active_runs == 16
@@ -3903,6 +3922,72 @@ def test_dag_schema_defaults_optimization():
     assert dag_non_defaults_data["description"] == "Test description"
 
 
[email protected](
+    ("cfg_overrides", "dag_kwargs", "expected_wire"),
+    [
+        pytest.param(
+            {
+                ("core", "max_active_runs_per_dag"): "1",
+                ("core", "max_active_tasks_per_dag"): "1",
+                ("core", "max_consecutive_failed_dag_runs_per_dag"): "1",
+            },
+            {
+                "dag_id": "test_dag_fields_cfg_ne_user",
+                "max_active_runs": 16,
+                "max_active_tasks": 16,
+                "max_consecutive_failed_dag_runs": 0,
+            },
+            {"max_active_runs": 16, "max_active_tasks": 16, 
"max_consecutive_failed_dag_runs": 0},
+            id="user_value_differs_from_cfg",
+        ),
+        pytest.param(
+            {
+                ("core", "max_active_runs_per_dag"): "16",
+                ("core", "max_active_tasks_per_dag"): "16",
+                ("core", "max_consecutive_failed_dag_runs_per_dag"): "0",
+            },
+            {
+                "dag_id": "test_dag_fields_cfg_eq_user",
+                "max_active_runs": 16,
+                "max_active_tasks": 16,
+                "max_consecutive_failed_dag_runs": 0,
+            },
+            {"max_active_runs": 16, "max_active_tasks": 16, 
"max_consecutive_failed_dag_runs": 0},
+            id="user_value_equals_cfg",
+        ),
+        pytest.param(
+            {("scheduler", "catchup_by_default"): "True"},
+            {"dag_id": "test_dag_catchup_override", "catchup": False},
+            {"catchup": False},
+            id="catchup_false_with_catchup_by_default_true",
+        ),
+        pytest.param(
+            {("dag_processor", "disable_bundle_versioning"): "False"},
+            {"dag_id": "test_dag_disable_bundle_versioning", 
"disable_bundle_versioning": True},
+            {"disable_bundle_versioning": True},
+            id="disable_bundle_versioning_true_with_cfg_false",
+        ),
+    ],
+)
+def test_dag_config_driven_fields_always_serialized(cfg_overrides, dag_kwargs, 
expected_wire):
+    """Config-driven DAG fields are always present on the wire regardless of 
the airflow.cfg value.
+
+    Fields like max_active_runs and other config-driven fields were silently 
dropped during
+    serialisation when their value matched the schema default, regardless of 
what airflow.cfg
+    was set to. #55849 excluded any field whose value matched the schema 
default.
+    """
+    with conf_vars(cfg_overrides):
+        dag = DAG(start_date=datetime(2023, 1, 1), **dag_kwargs)
+        serialized = DagSerialization.to_dict(dag)
+
+    for field, value in expected_wire.items():
+        assert serialized["dag"][field] == value
+
+    lazy_dag = LazyDeserializedDAG(data=serialized)
+    for field, value in expected_wire.items():
+        assert getattr(lazy_dag, field) == value
+
+
 def test_email_optimization_removes_email_attrs_when_email_empty():
     """Test that email_on_failure and email_on_retry are removed when email is 
empty."""
     with DAG(dag_id="test_email_optimization") as dag:
diff --git a/scripts/in_container/run_schema_defaults_check.py 
b/scripts/in_container/run_schema_defaults_check.py
index 9b754f265ca..7afcbd9ce54 100755
--- a/scripts/in_container/run_schema_defaults_check.py
+++ b/scripts/in_container/run_schema_defaults_check.py
@@ -200,7 +200,16 @@ def compare_dag_defaults() -> list[str]:
             if (
                 server_value is not None
                 and server_value not in [[], {}, (), set()]
-                and field_name not in ["dag_id", "dag_display_name"]
+                and field_name
+                not in [
+                    "dag_id",
+                    "dag_display_name",
+                    "max_active_runs",
+                    "max_active_tasks",
+                    "max_consecutive_failed_dag_runs",
+                    "catchup",
+                    "disable_bundle_versioning",
+                ]
             ):
                 errors.append(
                     f"DAG server field '{field_name}' has default 
{server_value!r} but no schema default"

Reply via email to