This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9c01e1b3fdec95c1eff5e39cea76fa84c607bd39
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Aug 22 01:06:23 2025 +0100
[v3-0-test] fix: removed the condition because of which it is not moving to
default data (#54495) (#54639)
(cherry picked from commit 723297fcf18a5bfc48420e884516bd8efd48c2a6)
Co-authored-by: vikrantkumar-max <[email protected]>
---
.../src/airflow/dag_processing/collection.py | 22 +++++----
.../tests/unit/dag_processing/test_collection.py | 57 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 9 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index e53889b945d..34592e01514 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -449,22 +449,26 @@ class DagModelOperation(NamedTuple):
# These "is not None" checks are because a LazySerializedDag
object does not
# provide the default value if the user doesn't provide an
explicit value.
- if dag.max_active_tasks is not None:
- dm.max_active_tasks = dag.max_active_tasks
- elif dag.max_active_tasks is None and dm.max_active_tasks is None:
+
+ # if dag.max_active_tasks come as None then default
max_active_tasks should be updated
+ # similar for max_consecutive_failed_dag_runs, max_active_runs
+
+ if dag.max_active_tasks is None:
dm.max_active_tasks = conf.getint("core",
"max_active_tasks_per_dag")
+ else:
+ dm.max_active_tasks = dag.max_active_tasks
- if dag.max_active_runs is not None:
- dm.max_active_runs = dag.max_active_runs
- elif dag.max_active_runs is None and dm.max_active_runs is None:
+ if dag.max_active_runs is None:
dm.max_active_runs = conf.getint("core",
"max_active_runs_per_dag")
+ else:
+ dm.max_active_runs = dag.max_active_runs
- if dag.max_consecutive_failed_dag_runs is not None:
- dm.max_consecutive_failed_dag_runs =
dag.max_consecutive_failed_dag_runs
- elif dag.max_consecutive_failed_dag_runs is None and
dm.max_consecutive_failed_dag_runs is None:
+ if dag.max_consecutive_failed_dag_runs is None:
dm.max_consecutive_failed_dag_runs = conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
)
+ else:
+ dm.max_consecutive_failed_dag_runs =
dag.max_consecutive_failed_dag_runs
if hasattr(dag, "has_task_concurrency_limits"):
dm.has_task_concurrency_limits =
dag.has_task_concurrency_limits
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 5e353f0dddc..9d9bf272ad5 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -57,6 +57,7 @@ from airflow.serialization.serialized_objects import
LazyDeserializedDAG, Serial
from airflow.utils import timezone as tz
from airflow.utils.session import create_session
+from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_dags,
@@ -814,3 +815,59 @@ class TestUpdateDagParsingResults:
orm_dag = session.get(DagModel, "mydag")
assert orm_dag.bundle_name == "testing"
assert orm_dag.bundle_version == "1.0"
+
+ def test_max_active_tasks_explicit_value_is_used(self, testing_dag_bundle,
session, dag_maker):
+ with dag_maker("dag_max_tasks", schedule=None, max_active_tasks=5) as
dag:
+ ...
+ update_dag_parsing_results_in_db("testing", None,
[self.dag_to_lazy_serdag(dag)], {}, set(), session)
+ orm_dag = session.get(DagModel, "dag_max_tasks")
+ assert orm_dag.max_active_tasks == 5
+
+ def test_max_active_tasks_defaults_from_conf_when_none(self,
testing_dag_bundle, session, dag_maker):
+ # Override config so that when DAG.max_active_tasks is None, DagModel
gets the configured default
+ with conf_vars({("core", "max_active_tasks_per_dag"): "7"}):
+ with dag_maker("dag_max_tasks_default", schedule=None) as dag:
+ ...
+ update_dag_parsing_results_in_db(
+ "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(),
session
+ )
+ orm_dag = session.get(DagModel, "dag_max_tasks_default")
+ assert orm_dag.max_active_tasks == 7
+
+ def test_max_active_runs_explicit_value_is_used(self, testing_dag_bundle,
session, dag_maker):
+ with dag_maker("dag_max_runs", schedule=None, max_active_runs=3) as
dag:
+ ...
+ update_dag_parsing_results_in_db("testing", None,
[self.dag_to_lazy_serdag(dag)], {}, set(), session)
+ orm_dag = session.get(DagModel, "dag_max_runs")
+ assert orm_dag.max_active_runs == 3
+
+ def test_max_active_runs_defaults_from_conf_when_none(self,
testing_dag_bundle, session, dag_maker):
+ with conf_vars({("core", "max_active_runs_per_dag"): "4"}):
+ with dag_maker("dag_max_runs_default", schedule=None) as dag:
+ ...
+ update_dag_parsing_results_in_db(
+ "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(),
session
+ )
+ orm_dag = session.get(DagModel, "dag_max_runs_default")
+ assert orm_dag.max_active_runs == 4
+
+ def test_max_consecutive_failed_dag_runs_explicit_value_is_used(
+ self, testing_dag_bundle, session, dag_maker
+ ):
+ with dag_maker("dag_max_failed_runs", schedule=None,
max_consecutive_failed_dag_runs=2) as dag:
+ ...
+ update_dag_parsing_results_in_db("testing", None,
[self.dag_to_lazy_serdag(dag)], {}, set(), session)
+ orm_dag = session.get(DagModel, "dag_max_failed_runs")
+ assert orm_dag.max_consecutive_failed_dag_runs == 2
+
+ def test_max_consecutive_failed_dag_runs_defaults_from_conf_when_none(
+ self, testing_dag_bundle, session, dag_maker
+ ):
+ with conf_vars({("core", "max_consecutive_failed_dag_runs_per_dag"):
"6"}):
+ with dag_maker("dag_max_failed_runs_default", schedule=None) as
dag:
+ ...
+ update_dag_parsing_results_in_db(
+ "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(),
session
+ )
+ orm_dag = session.get(DagModel, "dag_max_failed_runs_default")
+ assert orm_dag.max_consecutive_failed_dag_runs == 6