This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9c01e1b3fdec95c1eff5e39cea76fa84c607bd39
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Aug 22 01:06:23 2025 +0100

    [v3-0-test] fix: removed the condition because of which it is not moving to 
default data (#54495) (#54639)
    
    (cherry picked from commit 723297fcf18a5bfc48420e884516bd8efd48c2a6)
    
    Co-authored-by: vikrantkumar-max <[email protected]>
---
 .../src/airflow/dag_processing/collection.py       | 22 +++++----
 .../tests/unit/dag_processing/test_collection.py   | 57 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index e53889b945d..34592e01514 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -449,22 +449,26 @@ class DagModelOperation(NamedTuple):
 
             # These "is not None" checks are because a LazySerializedDag 
object does not
             # provide the default value if the user doesn't provide an 
explicit value.
-            if dag.max_active_tasks is not None:
-                dm.max_active_tasks = dag.max_active_tasks
-            elif dag.max_active_tasks is None and dm.max_active_tasks is None:
+
+            # if dag.max_active_tasks come as None then default 
max_active_tasks should be updated
+            # similar for max_consecutive_failed_dag_runs, max_active_runs
+
+            if dag.max_active_tasks is None:
                 dm.max_active_tasks = conf.getint("core", 
"max_active_tasks_per_dag")
+            else:
+                dm.max_active_tasks = dag.max_active_tasks
 
-            if dag.max_active_runs is not None:
-                dm.max_active_runs = dag.max_active_runs
-            elif dag.max_active_runs is None and dm.max_active_runs is None:
+            if dag.max_active_runs is None:
                 dm.max_active_runs = conf.getint("core", 
"max_active_runs_per_dag")
+            else:
+                dm.max_active_runs = dag.max_active_runs
 
-            if dag.max_consecutive_failed_dag_runs is not None:
-                dm.max_consecutive_failed_dag_runs = 
dag.max_consecutive_failed_dag_runs
-            elif dag.max_consecutive_failed_dag_runs is None and 
dm.max_consecutive_failed_dag_runs is None:
+            if dag.max_consecutive_failed_dag_runs is None:
                 dm.max_consecutive_failed_dag_runs = conf.getint(
                     "core", "max_consecutive_failed_dag_runs_per_dag"
                 )
+            else:
+                dm.max_consecutive_failed_dag_runs = 
dag.max_consecutive_failed_dag_runs
 
             if hasattr(dag, "has_task_concurrency_limits"):
                 dm.has_task_concurrency_limits = 
dag.has_task_concurrency_limits
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 5e353f0dddc..9d9bf272ad5 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -57,6 +57,7 @@ from airflow.serialization.serialized_objects import 
LazyDeserializedDAG, Serial
 from airflow.utils import timezone as tz
 from airflow.utils.session import create_session
 
+from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import (
     clear_db_assets,
     clear_db_dags,
@@ -814,3 +815,59 @@ class TestUpdateDagParsingResults:
         orm_dag = session.get(DagModel, "mydag")
         assert orm_dag.bundle_name == "testing"
         assert orm_dag.bundle_version == "1.0"
+
+    def test_max_active_tasks_explicit_value_is_used(self, testing_dag_bundle, 
session, dag_maker):
+        with dag_maker("dag_max_tasks", schedule=None, max_active_tasks=5) as 
dag:
+            ...
+        update_dag_parsing_results_in_db("testing", None, 
[self.dag_to_lazy_serdag(dag)], {}, set(), session)
+        orm_dag = session.get(DagModel, "dag_max_tasks")
+        assert orm_dag.max_active_tasks == 5
+
+    def test_max_active_tasks_defaults_from_conf_when_none(self, 
testing_dag_bundle, session, dag_maker):
+        # Override config so that when DAG.max_active_tasks is None, DagModel 
gets the configured default
+        with conf_vars({("core", "max_active_tasks_per_dag"): "7"}):
+            with dag_maker("dag_max_tasks_default", schedule=None) as dag:
+                ...
+            update_dag_parsing_results_in_db(
+                "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), 
session
+            )
+            orm_dag = session.get(DagModel, "dag_max_tasks_default")
+            assert orm_dag.max_active_tasks == 7
+
+    def test_max_active_runs_explicit_value_is_used(self, testing_dag_bundle, 
session, dag_maker):
+        with dag_maker("dag_max_runs", schedule=None, max_active_runs=3) as 
dag:
+            ...
+        update_dag_parsing_results_in_db("testing", None, 
[self.dag_to_lazy_serdag(dag)], {}, set(), session)
+        orm_dag = session.get(DagModel, "dag_max_runs")
+        assert orm_dag.max_active_runs == 3
+
+    def test_max_active_runs_defaults_from_conf_when_none(self, 
testing_dag_bundle, session, dag_maker):
+        with conf_vars({("core", "max_active_runs_per_dag"): "4"}):
+            with dag_maker("dag_max_runs_default", schedule=None) as dag:
+                ...
+            update_dag_parsing_results_in_db(
+                "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), 
session
+            )
+            orm_dag = session.get(DagModel, "dag_max_runs_default")
+            assert orm_dag.max_active_runs == 4
+
+    def test_max_consecutive_failed_dag_runs_explicit_value_is_used(
+        self, testing_dag_bundle, session, dag_maker
+    ):
+        with dag_maker("dag_max_failed_runs", schedule=None, 
max_consecutive_failed_dag_runs=2) as dag:
+            ...
+        update_dag_parsing_results_in_db("testing", None, 
[self.dag_to_lazy_serdag(dag)], {}, set(), session)
+        orm_dag = session.get(DagModel, "dag_max_failed_runs")
+        assert orm_dag.max_consecutive_failed_dag_runs == 2
+
+    def test_max_consecutive_failed_dag_runs_defaults_from_conf_when_none(
+        self, testing_dag_bundle, session, dag_maker
+    ):
+        with conf_vars({("core", "max_consecutive_failed_dag_runs_per_dag"): 
"6"}):
+            with dag_maker("dag_max_failed_runs_default", schedule=None) as 
dag:
+                ...
+            update_dag_parsing_results_in_db(
+                "testing", None, [self.dag_to_lazy_serdag(dag)], {}, set(), 
session
+            )
+            orm_dag = session.get(DagModel, "dag_max_failed_runs_default")
+            assert orm_dag.max_consecutive_failed_dag_runs == 6

Reply via email to