This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 654ccd689b2 [v3-1-test] fix: Add ``max_retry_delay`` to 
``MappedOperator`` model (#56396) (#56951)
654ccd689b2 is described below

commit 654ccd689b20eb07230d49f8a02836aba8385162
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 21 16:41:31 2025 +0100

    [v3-1-test] fix: Add ``max_retry_delay`` to ``MappedOperator`` model 
(#56396) (#56951)
    
    Fixes #56184
    (cherry picked from commit 871ff3a010c91de84af291cdc7ae4fa15964a77f)
    
    Co-authored-by: David Blain <[email protected]>
---
 airflow-core/src/airflow/models/mappedoperator.py  |   4 +
 .../tests/unit/models/test_mappedoperator.py       | 178 +++++++++++++++++++++
 2 files changed, 182 insertions(+)

diff --git a/airflow-core/src/airflow/models/mappedoperator.py 
b/airflow-core/src/airflow/models/mappedoperator.py
index e7fa851e695..9e7a294e5b5 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -280,6 +280,10 @@ class MappedOperator(DAGNode):
     def retry_exponential_backoff(self) -> bool:
         return bool(self.partial_kwargs.get("retry_exponential_backoff"))
 
+    @property
+    def max_retry_delay(self) -> datetime.timedelta | None:
+        return self.partial_kwargs.get("max_retry_delay")
+
     @property
     def weight_rule(self) -> PriorityWeightStrategy:
         return validate_and_load_priority_weight_strategy(
diff --git a/airflow-core/tests/unit/models/test_mappedoperator.py 
b/airflow-core/tests/unit/models/test_mappedoperator.py
index ca21fc2464b..bd20b5e1f4f 100644
--- a/airflow-core/tests/unit/models/test_mappedoperator.py
+++ b/airflow-core/tests/unit/models/test_mappedoperator.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import datetime
 from collections import defaultdict
+from datetime import timedelta
 from typing import TYPE_CHECKING
 from unittest import mock
 from unittest.mock import patch
@@ -32,6 +33,8 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskmap import TaskMap
 from airflow.providers.standard.operators.python import PythonOperator
 from airflow.sdk import DAG, BaseOperator, TaskGroup, setup, task, task_group, 
teardown
+from airflow.serialization.serialized_objects import SerializedBaseOperator
+from airflow.task.priority_strategy import PriorityWeightStrategy
 from airflow.task.trigger_rule import TriggerRule
 from airflow.utils.state import TaskInstanceState
 
@@ -1372,6 +1375,181 @@ class TestMappedSetupTeardown:
         }
         assert states == expected
 
+    @pytest.mark.parametrize(
+        (
+            "email,"
+            "execution_timeout,"
+            "retry_delay,"
+            "max_retry_delay,"
+            "retry_exponential_backoff,"
+            "max_active_tis_per_dag,"
+            "max_active_tis_per_dagrun,"
+            "run_as_user,"
+            "resources,"
+            "has_on_execute_callback,"
+            "has_on_failure_callback,"
+            "has_on_retry_callback,"
+            "has_on_success_callback,"
+            "has_on_skipped_callback,"
+            "executor_config,"
+            "inlets,"
+            "outlets,"
+            "doc,"
+            "doc_md,"
+            "doc_json,"
+            "doc_yaml,"
+            "doc_rst"
+        ),
+        [
+            pytest.param(
+                # Default case
+                "email",
+                timedelta(seconds=10),
+                timedelta(seconds=5),
+                timedelta(seconds=60),
+                True,
+                1,
+                2,
+                "user",
+                None,
+                False,
+                False,
+                False,
+                False,
+                False,
+                None,
+                None,
+                None,
+                None,
+                None,
+                None,
+                None,
+                None,
+                id="default",
+            ),
+            pytest.param(
+                # With all optional values and callbacks set
+                None,
+                timedelta(seconds=20),
+                timedelta(seconds=10),
+                timedelta(seconds=120),
+                False,
+                3,
+                5,
+                None,
+                {"CPU": 1},
+                True,
+                True,
+                True,
+                True,
+                True,
+                {"key": "value"},
+                ["input_table"],
+                ["output_table"],
+                "Some docs",
+                "MD docs",
+                {"json": True},
+                "yaml: true",
+                "RST docs",
+                id="with-values-and-callbacks",
+            ),
+        ],
+    )
+    def test_properties(
+        self,
+        email,
+        execution_timeout,
+        retry_delay,
+        max_retry_delay,
+        retry_exponential_backoff,
+        max_active_tis_per_dag,
+        max_active_tis_per_dagrun,
+        run_as_user,
+        resources,
+        has_on_execute_callback,
+        has_on_failure_callback,
+        has_on_retry_callback,
+        has_on_success_callback,
+        has_on_skipped_callback,
+        executor_config,
+        inlets,
+        outlets,
+        doc,
+        doc_md,
+        doc_json,
+        doc_yaml,
+        doc_rst,
+    ):
+        op = PythonOperator.partial(
+            task_id="mapped",
+            python_callable=print,
+            email=email,
+            execution_timeout=execution_timeout,
+            retry_delay=retry_delay,
+            max_retry_delay=max_retry_delay,
+            retry_exponential_backoff=retry_exponential_backoff,
+            max_active_tis_per_dag=max_active_tis_per_dag,
+            max_active_tis_per_dagrun=max_active_tis_per_dagrun,
+            run_as_user=run_as_user,
+            resources=resources,
+            on_execute_callback=(lambda: None) if has_on_execute_callback else 
None,
+            on_failure_callback=(lambda: None) if has_on_failure_callback else 
None,
+            on_retry_callback=(lambda: None) if has_on_retry_callback else 
None,
+            on_success_callback=(lambda: None) if has_on_success_callback else 
None,
+            on_skipped_callback=(lambda: None) if has_on_skipped_callback else 
None,
+            executor_config=executor_config,
+            inlets=inlets,
+            outlets=outlets,
+            doc=doc,
+            doc_md=doc_md,
+            doc_json=doc_json,
+            doc_yaml=doc_yaml,
+            doc_rst=doc_rst,
+        ).expand(op_args=["Hello", "world"])
+
+        assert op.operator_name == PythonOperator.__name__
+        assert op.roots == [op]
+        assert op.leaves == [op]
+        assert op.task_display_name == "mapped"
+        assert op.owner == SerializedBaseOperator.owner
+        assert op.trigger_rule == SerializedBaseOperator.trigger_rule
+        assert not op.map_index_template
+        assert not op.is_setup
+        assert not op.is_teardown
+        assert not op.depends_on_past
+        assert op.ignore_first_depends_on_past == 
bool(SerializedBaseOperator.ignore_first_depends_on_past)
+        assert not op.wait_for_downstream
+        assert op.retries == SerializedBaseOperator.retries
+        assert op.queue == SerializedBaseOperator.queue
+        assert op.pool == SerializedBaseOperator.pool
+        assert op.pool_slots == SerializedBaseOperator.pool_slots
+        assert op.priority_weight == SerializedBaseOperator.priority_weight
+        assert isinstance(op.weight_rule, PriorityWeightStrategy)
+        assert op.email == email
+        assert op.execution_timeout == execution_timeout
+        assert op.retry_delay == retry_delay
+        assert op.max_retry_delay == max_retry_delay
+        assert op.retry_exponential_backoff == retry_exponential_backoff
+        assert op.max_active_tis_per_dag == max_active_tis_per_dag
+        assert op.max_active_tis_per_dagrun == max_active_tis_per_dagrun
+        assert op.run_as_user == run_as_user
+        assert op.email_on_failure
+        assert op.email_on_retry
+        assert (op.resources is not None) == bool(resources)
+        assert op.has_on_execute_callback == has_on_execute_callback
+        assert op.has_on_failure_callback == has_on_failure_callback
+        assert op.has_on_retry_callback == has_on_retry_callback
+        assert op.has_on_success_callback == has_on_success_callback
+        assert op.has_on_skipped_callback == has_on_skipped_callback
+        assert (op.executor_config is not None) == bool(executor_config)
+        assert (op.inlets is not None) == bool(inlets)
+        assert (op.outlets is not None) == bool(outlets)
+        assert (op.doc is not None) == bool(doc)
+        assert (op.doc_md is not None) == bool(doc_md)
+        assert (op.doc_json is not None) == bool(doc_json)
+        assert (op.doc_yaml is not None) == bool(doc_yaml)
+        assert (op.doc_rst is not None) == bool(doc_rst)
+
 
 def 
test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_maker,
 session):
     """Test that one failed trigger rule works well in mapped task group"""

Reply via email to