This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 654ccd689b2 [v3-1-test] fix: Add ``max_retry_delay`` to
``MappedOperator`` model (#56396) (#56951)
654ccd689b2 is described below
commit 654ccd689b20eb07230d49f8a02836aba8385162
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 21 16:41:31 2025 +0100
[v3-1-test] fix: Add ``max_retry_delay`` to ``MappedOperator`` model
(#56396) (#56951)
Fixes #56184
(cherry picked from commit 871ff3a010c91de84af291cdc7ae4fa15964a77f)
Co-authored-by: David Blain <[email protected]>
---
airflow-core/src/airflow/models/mappedoperator.py | 4 +
.../tests/unit/models/test_mappedoperator.py | 178 +++++++++++++++++++++
2 files changed, 182 insertions(+)
diff --git a/airflow-core/src/airflow/models/mappedoperator.py
b/airflow-core/src/airflow/models/mappedoperator.py
index e7fa851e695..9e7a294e5b5 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -280,6 +280,10 @@ class MappedOperator(DAGNode):
def retry_exponential_backoff(self) -> bool:
return bool(self.partial_kwargs.get("retry_exponential_backoff"))
+ @property
+ def max_retry_delay(self) -> datetime.timedelta | None:
+ return self.partial_kwargs.get("max_retry_delay")
+
@property
def weight_rule(self) -> PriorityWeightStrategy:
return validate_and_load_priority_weight_strategy(
diff --git a/airflow-core/tests/unit/models/test_mappedoperator.py
b/airflow-core/tests/unit/models/test_mappedoperator.py
index ca21fc2464b..bd20b5e1f4f 100644
--- a/airflow-core/tests/unit/models/test_mappedoperator.py
+++ b/airflow-core/tests/unit/models/test_mappedoperator.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import datetime
from collections import defaultdict
+from datetime import timedelta
from typing import TYPE_CHECKING
from unittest import mock
from unittest.mock import patch
@@ -32,6 +33,8 @@ from airflow.models.taskinstance import TaskInstance
from airflow.models.taskmap import TaskMap
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG, BaseOperator, TaskGroup, setup, task, task_group,
teardown
+from airflow.serialization.serialized_objects import SerializedBaseOperator
+from airflow.task.priority_strategy import PriorityWeightStrategy
from airflow.task.trigger_rule import TriggerRule
from airflow.utils.state import TaskInstanceState
@@ -1372,6 +1375,181 @@ class TestMappedSetupTeardown:
}
assert states == expected
+ @pytest.mark.parametrize(
+ (
+ "email,"
+ "execution_timeout,"
+ "retry_delay,"
+ "max_retry_delay,"
+ "retry_exponential_backoff,"
+ "max_active_tis_per_dag,"
+ "max_active_tis_per_dagrun,"
+ "run_as_user,"
+ "resources,"
+ "has_on_execute_callback,"
+ "has_on_failure_callback,"
+ "has_on_retry_callback,"
+ "has_on_success_callback,"
+ "has_on_skipped_callback,"
+ "executor_config,"
+ "inlets,"
+ "outlets,"
+ "doc,"
+ "doc_md,"
+ "doc_json,"
+ "doc_yaml,"
+ "doc_rst"
+ ),
+ [
+ pytest.param(
+ # Default case
+ "email",
+ timedelta(seconds=10),
+ timedelta(seconds=5),
+ timedelta(seconds=60),
+ True,
+ 1,
+ 2,
+ "user",
+ None,
+ False,
+ False,
+ False,
+ False,
+ False,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ id="default",
+ ),
+ pytest.param(
+ # With all optional values and callbacks set
+ None,
+ timedelta(seconds=20),
+ timedelta(seconds=10),
+ timedelta(seconds=120),
+ False,
+ 3,
+ 5,
+ None,
+ {"CPU": 1},
+ True,
+ True,
+ True,
+ True,
+ True,
+ {"key": "value"},
+ ["input_table"],
+ ["output_table"],
+ "Some docs",
+ "MD docs",
+ {"json": True},
+ "yaml: true",
+ "RST docs",
+ id="with-values-and-callbacks",
+ ),
+ ],
+ )
+ def test_properties(
+ self,
+ email,
+ execution_timeout,
+ retry_delay,
+ max_retry_delay,
+ retry_exponential_backoff,
+ max_active_tis_per_dag,
+ max_active_tis_per_dagrun,
+ run_as_user,
+ resources,
+ has_on_execute_callback,
+ has_on_failure_callback,
+ has_on_retry_callback,
+ has_on_success_callback,
+ has_on_skipped_callback,
+ executor_config,
+ inlets,
+ outlets,
+ doc,
+ doc_md,
+ doc_json,
+ doc_yaml,
+ doc_rst,
+ ):
+ op = PythonOperator.partial(
+ task_id="mapped",
+ python_callable=print,
+ email=email,
+ execution_timeout=execution_timeout,
+ retry_delay=retry_delay,
+ max_retry_delay=max_retry_delay,
+ retry_exponential_backoff=retry_exponential_backoff,
+ max_active_tis_per_dag=max_active_tis_per_dag,
+ max_active_tis_per_dagrun=max_active_tis_per_dagrun,
+ run_as_user=run_as_user,
+ resources=resources,
+ on_execute_callback=(lambda: None) if has_on_execute_callback else
None,
+ on_failure_callback=(lambda: None) if has_on_failure_callback else
None,
+ on_retry_callback=(lambda: None) if has_on_retry_callback else
None,
+ on_success_callback=(lambda: None) if has_on_success_callback else
None,
+ on_skipped_callback=(lambda: None) if has_on_skipped_callback else
None,
+ executor_config=executor_config,
+ inlets=inlets,
+ outlets=outlets,
+ doc=doc,
+ doc_md=doc_md,
+ doc_json=doc_json,
+ doc_yaml=doc_yaml,
+ doc_rst=doc_rst,
+ ).expand(op_args=["Hello", "world"])
+
+ assert op.operator_name == PythonOperator.__name__
+ assert op.roots == [op]
+ assert op.leaves == [op]
+ assert op.task_display_name == "mapped"
+ assert op.owner == SerializedBaseOperator.owner
+ assert op.trigger_rule == SerializedBaseOperator.trigger_rule
+ assert not op.map_index_template
+ assert not op.is_setup
+ assert not op.is_teardown
+ assert not op.depends_on_past
+ assert op.ignore_first_depends_on_past ==
bool(SerializedBaseOperator.ignore_first_depends_on_past)
+ assert not op.wait_for_downstream
+ assert op.retries == SerializedBaseOperator.retries
+ assert op.queue == SerializedBaseOperator.queue
+ assert op.pool == SerializedBaseOperator.pool
+ assert op.pool_slots == SerializedBaseOperator.pool_slots
+ assert op.priority_weight == SerializedBaseOperator.priority_weight
+ assert isinstance(op.weight_rule, PriorityWeightStrategy)
+ assert op.email == email
+ assert op.execution_timeout == execution_timeout
+ assert op.retry_delay == retry_delay
+ assert op.max_retry_delay == max_retry_delay
+ assert op.retry_exponential_backoff == retry_exponential_backoff
+ assert op.max_active_tis_per_dag == max_active_tis_per_dag
+ assert op.max_active_tis_per_dagrun == max_active_tis_per_dagrun
+ assert op.run_as_user == run_as_user
+ assert op.email_on_failure
+ assert op.email_on_retry
+ assert (op.resources is not None) == bool(resources)
+ assert op.has_on_execute_callback == has_on_execute_callback
+ assert op.has_on_failure_callback == has_on_failure_callback
+ assert op.has_on_retry_callback == has_on_retry_callback
+ assert op.has_on_success_callback == has_on_success_callback
+ assert op.has_on_skipped_callback == has_on_skipped_callback
+ assert (op.executor_config is not None) == bool(executor_config)
+ assert (op.inlets is not None) == bool(inlets)
+ assert (op.outlets is not None) == bool(outlets)
+ assert (op.doc is not None) == bool(doc)
+ assert (op.doc_md is not None) == bool(doc_md)
+ assert (op.doc_json is not None) == bool(doc_json)
+ assert (op.doc_yaml is not None) == bool(doc_yaml)
+ assert (op.doc_rst is not None) == bool(doc_rst)
+
def
test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_maker,
session):
"""Test that one failed trigger rule works well in mapped task group"""