This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 76eb2a09002 Fix max_active_runs lost during DAG serialisation when
value equals schema default (#65310)
76eb2a09002 is described below
commit 76eb2a0900239c0df30a5e286c113b88cf60af7c
Author: Selman <[email protected]>
AuthorDate: Mon May 18 12:29:17 2026 +0300
Fix max_active_runs lost during DAG serialisation when value equals schema
default (#65310)
* Fix max_active_runs lost during DAG serialisation when value equals
schema default
The serialisation optimisation from #55849 strips DAG fields that match
their schema.json default. For max_active_runs, max_active_tasks, and
max_consecutive_failed_dag_runs this is wrong because their runtime
defaults come from airflow.cfg, not the schema. When a user explicitly
sets max_active_runs=16 and the config has max_active_runs_per_dag=1,
the value gets stripped and the dag table ends up with 1.
Skip the schema-default exclusion for these three config-driven fields
so they always survive serialisation.
* fix: introduce DAG_DEFAULTS
* chore: add test for catchup
* fix: revert comment change, that wording was better
* fix: preserve DAG fields whose defaults match the schema default
* fix: update schema default check script
* fix: address review comments on serialisation tests
* fix: add disable_bundle_versioning case to parametrised serialisation test
* fix: improve docstring for config-driven fields serialisation test
* fix: use tuple for pytest.mark.parametrize first argument
---
airflow-core/src/airflow/serialization/schema.json | 10 +--
.../tests/unit/dag_processing/test_collection.py | 18 +++++
.../unit/serialization/test_dag_serialization.py | 93 +++++++++++++++++++++-
scripts/in_container/run_schema_defaults_check.py | 11 ++-
4 files changed, 122 insertions(+), 10 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/schema.json
b/airflow-core/src/airflow/serialization/schema.json
index bed7b7e9132..e2eed749c94 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -175,7 +175,7 @@
"value": { "$ref": "#/definitions/dict" }
}
},
- "catchup": { "type": "boolean", "default": false },
+ "catchup": { "type": "boolean" },
"allowed_run_types": {
"anyOf": [
{ "type": "array", "items": { "type": "string" } },
@@ -204,9 +204,9 @@
]
},
"_concurrency": { "type" : "number"},
- "max_active_tasks": { "type" : "number", "default": 16},
- "max_active_runs": { "type" : "number", "default": 16},
- "max_consecutive_failed_dag_runs": { "type" : "number", "default": 0},
+ "max_active_tasks": { "type" : "number" },
+ "max_active_runs": { "type" : "number" },
+ "max_consecutive_failed_dag_runs": { "type" : "number" },
"default_args": { "$ref": "#/definitions/dict" },
"start_date": { "$ref": "#/definitions/datetime" },
"end_date": { "$ref": "#/definitions/datetime" },
@@ -224,7 +224,7 @@
]},
"edge_info": { "$ref": "#/definitions/edge_info" },
"dag_dependencies": { "$ref": "#/definitions/dag_dependencies" },
- "disable_bundle_versioning": {"type": "boolean", "default": false }
+ "disable_bundle_versioning": {"type": "boolean" }
},
"required": [
"dag_id",
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 42be7792381..40224253961 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -1137,6 +1137,24 @@ class TestUpdateDagParsingResults:
orm_dag = session.get(DagModel, "dag_max_runs")
assert orm_dag.max_active_runs == 3
+ @pytest.mark.parametrize(
+ ("field", "cfg_key", "schema_default"),
+ [
+ ("max_active_runs", "max_active_runs_per_dag", 16),
+ ("max_active_tasks", "max_active_tasks_per_dag", 16),
+ ("max_consecutive_failed_dag_runs",
"max_consecutive_failed_dag_runs_per_dag", 0),
+ ],
+ )
+ def
test_config_driven_field_equal_to_schema_default_not_overridden_by_conf(
+ self, testing_dag_bundle, session, dag_maker, field, cfg_key,
schema_default
+ ):
+ with conf_vars({("core", cfg_key): "1"}):
+ with dag_maker(f"dag_{field}_schema_default", schedule=None,
**{field: schema_default}) as dag:
+ ...
+ update_dag_parsing_results_in_db("testing", None, [dag], {}, 0.1,
set(), session)
+ orm_dag = session.get(DagModel, f"dag_{field}_schema_default")
+ assert getattr(orm_dag, field) == schema_default
+
def test_max_active_runs_defaults_from_conf_when_none(self,
testing_dag_bundle, session, dag_maker):
with conf_vars({("core", "max_active_runs_per_dag"): "4"}):
with dag_maker("dag_max_runs_default", schedule=None) as dag:
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 8d8cb113c2e..2b6d8a2edd2 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -335,6 +335,13 @@ serialized_simple_dag_ground_truth = {
},
],
"params": [],
+ # These fields have no schema default; they are always emitted on the
wire
+ # because their real default comes from airflow.cfg at parse time.
+ "catchup": False,
+ "disable_bundle_versioning": False,
+ "max_active_runs": 16,
+ "max_active_tasks": 16,
+ "max_consecutive_failed_dag_runs": 0,
},
}
@@ -3844,13 +3851,14 @@ def test_dag_schema_defaults_optimization():
dag_with_defaults = DAG(
dag_id="test_defaults_dag",
start_date=datetime(2023, 1, 1),
- # These should match schema defaults and be excluded
- catchup=False,
+ # These match remaining schema defaults and should be excluded
fail_fast=False,
+ render_template_as_native_obj=False,
+ # These are config-driven: no schema default, always emitted on the
wire
+ catchup=False,
max_active_runs=16,
max_active_tasks=16,
max_consecutive_failed_dag_runs=0,
- render_template_as_native_obj=False,
disable_bundle_versioning=False,
# These should be excluded as None
description=None,
@@ -3865,6 +3873,16 @@ def test_dag_schema_defaults_optimization():
for field in DagSerialization.get_schema_defaults("dag").keys():
assert field not in dag_data, f"Schema default field '{field}' should
be excluded"
+ # Config-driven fields have no schema default and are always present on
the wire
+ for field in (
+ "catchup",
+ "max_active_runs",
+ "max_active_tasks",
+ "max_consecutive_failed_dag_runs",
+ "disable_bundle_versioning",
+ ):
+ assert field in dag_data, f"Config-driven field '{field}' must always
be serialised"
+
# None fields should also be excluded
none_fields = ["description", "doc_md"]
for field in none_fields:
@@ -3873,7 +3891,8 @@ def test_dag_schema_defaults_optimization():
# Test deserialization restores defaults correctly
deserialized_dag = DagSerialization.from_dict(serialized)
- # Verify schema defaults are restored
+ # Verify values round-trip correctly: schema-default fields are restored
from the schema,
+ # config-driven fields are read directly from the wire.
assert deserialized_dag.catchup is False
assert deserialized_dag.fail_fast is False
assert deserialized_dag.max_active_runs == 16
@@ -3903,6 +3922,72 @@ def test_dag_schema_defaults_optimization():
assert dag_non_defaults_data["description"] == "Test description"
[email protected](
+ ("cfg_overrides", "dag_kwargs", "expected_wire"),
+ [
+ pytest.param(
+ {
+ ("core", "max_active_runs_per_dag"): "1",
+ ("core", "max_active_tasks_per_dag"): "1",
+ ("core", "max_consecutive_failed_dag_runs_per_dag"): "1",
+ },
+ {
+ "dag_id": "test_dag_fields_cfg_ne_user",
+ "max_active_runs": 16,
+ "max_active_tasks": 16,
+ "max_consecutive_failed_dag_runs": 0,
+ },
+ {"max_active_runs": 16, "max_active_tasks": 16,
"max_consecutive_failed_dag_runs": 0},
+ id="user_value_differs_from_cfg",
+ ),
+ pytest.param(
+ {
+ ("core", "max_active_runs_per_dag"): "16",
+ ("core", "max_active_tasks_per_dag"): "16",
+ ("core", "max_consecutive_failed_dag_runs_per_dag"): "0",
+ },
+ {
+ "dag_id": "test_dag_fields_cfg_eq_user",
+ "max_active_runs": 16,
+ "max_active_tasks": 16,
+ "max_consecutive_failed_dag_runs": 0,
+ },
+ {"max_active_runs": 16, "max_active_tasks": 16,
"max_consecutive_failed_dag_runs": 0},
+ id="user_value_equals_cfg",
+ ),
+ pytest.param(
+ {("scheduler", "catchup_by_default"): "True"},
+ {"dag_id": "test_dag_catchup_override", "catchup": False},
+ {"catchup": False},
+ id="catchup_false_with_catchup_by_default_true",
+ ),
+ pytest.param(
+ {("dag_processor", "disable_bundle_versioning"): "False"},
+ {"dag_id": "test_dag_disable_bundle_versioning",
"disable_bundle_versioning": True},
+ {"disable_bundle_versioning": True},
+ id="disable_bundle_versioning_true_with_cfg_false",
+ ),
+ ],
+)
+def test_dag_config_driven_fields_always_serialized(cfg_overrides, dag_kwargs,
expected_wire):
+ """Config-driven DAG fields are always present on the wire regardless of
the airflow.cfg value.
+
+ Fields like max_active_runs and other config-driven fields were silently
dropped during
+ serialisation when their value matched the schema default, regardless of
what airflow.cfg
+ was set to. #55849 excluded any field whose value matched the schema
default.
+ """
+ with conf_vars(cfg_overrides):
+ dag = DAG(start_date=datetime(2023, 1, 1), **dag_kwargs)
+ serialized = DagSerialization.to_dict(dag)
+
+ for field, value in expected_wire.items():
+ assert serialized["dag"][field] == value
+
+ lazy_dag = LazyDeserializedDAG(data=serialized)
+ for field, value in expected_wire.items():
+ assert getattr(lazy_dag, field) == value
+
+
def test_email_optimization_removes_email_attrs_when_email_empty():
"""Test that email_on_failure and email_on_retry are removed when email is
empty."""
with DAG(dag_id="test_email_optimization") as dag:
diff --git a/scripts/in_container/run_schema_defaults_check.py
b/scripts/in_container/run_schema_defaults_check.py
index 9b754f265ca..7afcbd9ce54 100755
--- a/scripts/in_container/run_schema_defaults_check.py
+++ b/scripts/in_container/run_schema_defaults_check.py
@@ -200,7 +200,16 @@ def compare_dag_defaults() -> list[str]:
if (
server_value is not None
and server_value not in [[], {}, (), set()]
- and field_name not in ["dag_id", "dag_display_name"]
+ and field_name
+ not in [
+ "dag_id",
+ "dag_display_name",
+ "max_active_runs",
+ "max_active_tasks",
+ "max_consecutive_failed_dag_runs",
+ "catchup",
+ "disable_bundle_versioning",
+ ]
):
errors.append(
f"DAG server field '{field_name}' has default
{server_value!r} but no schema default"