This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 28c413207df [v3-0-test] Update max_consecutive_failed_dag_runs default
value to zero in TaskSDK dag (#49795) (#49803)
28c413207df is described below
commit 28c413207dfd5a75d71d20d1204a079da7ecf01c
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Apr 26 08:09:01 2025 +0200
[v3-0-test] Update max_consecutive_failed_dag_runs default value to zero in
TaskSDK dag (#49795) (#49803)
* Update max_consecutive_failed_dag_runs default value to zero in TaskSDK
dag
* Update max_consecutive_failed_dag_runs default value to zero in TaskSDK
dag
(cherry picked from commit 59e76068505495e5e0bf0b6e8a102e6b67780610)
Co-authored-by: GPK <[email protected]>
---
airflow-core/src/airflow/models/dag.py | 11 +----------
.../tests/unit/serialization/test_dag_serialization.py | 2 +-
task-sdk/src/airflow/sdk/definitions/dag.py | 4 +---
3 files changed, 3 insertions(+), 14 deletions(-)
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index fb18cf4cd81..676541e8578 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -2042,16 +2042,7 @@ class DAG(TaskSDKDag, LoggingMixin):
if not field.init or field.name in ["edge_info"]:
continue
- value = getattr(dag, field.name)
-
- # Handle special cases where values need conversion
- if field.name == "max_consecutive_failed_dag_runs":
- # SchedulerDAG requires this to be >= 0, while TaskSDKDag
allows -1
- if value == -1:
- # If it is -1, we get the default value from the DAG
- continue
-
- kwargs[field.name] = value
+ kwargs[field.name] = getattr(dag, field.name)
new_dag = cls(**kwargs)
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 69570392158..d1d2dde7852 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -685,10 +685,10 @@ class TestStringifiedDAGs:
for field in fields_to_check:
actual = getattr(serialized_dag, field)
expected = getattr(dag, field, None)
+
assert actual == expected, f"{dag.dag_id}.{field} does not match"
# _processor_dags_folder is only populated at serialization time
# it's only used when relying on serialized dag to determine a dag's
relative path
- assert dag._processor_dags_folder is None
assert (
serialized_dag._processor_dags_folder
== (AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "tests" / "unit" /
"dags").as_posix()
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index afe5d117a74..8b6ed3e4b19 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -399,9 +399,7 @@ class DAG:
user_defined_filters: dict | None = None
max_active_tasks: int = attrs.field(default=16,
validator=attrs.validators.instance_of(int))
max_active_runs: int = attrs.field(default=16,
validator=attrs.validators.instance_of(int))
- max_consecutive_failed_dag_runs: int = attrs.field(
- default=-1, validator=attrs.validators.instance_of(int)
- )
+ max_consecutive_failed_dag_runs: int = attrs.field(default=0,
validator=attrs.validators.instance_of(int))
dagrun_timeout: timedelta | None = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)),