This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ab5bb63fc0 Fix KeyError when accessing retry_delay on MappedOperator 
without explicit value (#56605)
1ab5bb63fc0 is described below

commit 1ab5bb63fc00257bf492e1ef2fe71b4c2ca067a5
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Wed Oct 15 18:38:41 2025 -0500

    Fix KeyError when accessing retry_delay on MappedOperator without explicit 
value (#56605)
    
    
      When querying the API endpoint for mapped tasks (e.g., 
/api/v2/dags/{dag_id}/tasks/{task_id}),
      the server returned a 500 Internal Server Error if the mapped task didn't 
have an explicit
      retry_delay set. The retry_delay property was using direct dictionary 
access which raised
      a KeyError during serialization.
    
      Changed to use .get() with the default value from SerializedBaseOperator 
(300 seconds)
---
 airflow-core/src/airflow/models/mappedoperator.py  |  2 +-
 .../tests/unit/models/test_mappedoperator.py       | 43 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/models/mappedoperator.py 
b/airflow-core/src/airflow/models/mappedoperator.py
index b29877e9228..e7fa851e695 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -274,7 +274,7 @@ class MappedOperator(DAGNode):
 
     @property
     def retry_delay(self) -> datetime.timedelta:
-        return self.partial_kwargs["retry_delay"]
+        return self.partial_kwargs.get("retry_delay", 
SerializedBaseOperator.retry_delay)
 
     @property
     def retry_exponential_backoff(self) -> bool:
diff --git a/airflow-core/tests/unit/models/test_mappedoperator.py 
b/airflow-core/tests/unit/models/test_mappedoperator.py
index 84d05b3b932..ca21fc2464b 100644
--- a/airflow-core/tests/unit/models/test_mappedoperator.py
+++ b/airflow-core/tests/unit/models/test_mappedoperator.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import datetime
 from collections import defaultdict
 from typing import TYPE_CHECKING
 from unittest import mock
@@ -1401,3 +1402,45 @@ def 
test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_m
     dr.task_instance_scheduling_decisions()
     ti3 = dr.get_task_instance(task_id="tg1.t3")
     assert not ti3.state
+
+
+def test_mapped_operator_retry_delay_default(dag_maker):
+    """
+    Test that MappedOperator.retry_delay returns default value when not 
explicitly set.
+
+    This test verifies the fix for a KeyError that occurred when accessing 
retry_delay
+    on a MappedOperator without an explicit retry_delay value in 
partial_kwargs.
+    The property should fall back to SerializedBaseOperator.retry_delay (300 
seconds).
+    """
+    with dag_maker(dag_id="test_retry_delay", serialized=True) as dag:
+        # Create a mapped operator without explicitly setting retry_delay
+        MockOperator.partial(task_id="mapped_task").expand(arg2=[1, 2, 3])
+
+    # Get the deserialized mapped task
+    mapped_deser = dag.task_dict["mapped_task"]
+
+    # Accessing retry_delay should not raise KeyError
+    # and should return the default value (300 seconds)
+    assert mapped_deser.retry_delay == datetime.timedelta(seconds=300)
+
+
+def test_mapped_operator_retry_delay_explicit(dag_maker):
+    """
+    Test that MappedOperator.retry_delay returns explicit value when set.
+
+    This test verifies that when retry_delay is explicitly set in partial(),
+    the MappedOperator returns that value instead of the default.
+    """
+    custom_retry_delay = datetime.timedelta(seconds=600)
+
+    with dag_maker(dag_id="test_retry_delay_explicit", serialized=True) as dag:
+        # Create a mapped operator with explicit retry_delay
+        MockOperator.partial(task_id="mapped_task_with_retry", 
retry_delay=custom_retry_delay).expand(
+            arg2=[1, 2, 3]
+        )
+
+    # Get the deserialized mapped task
+    mapped_deser = dag.task_dict["mapped_task_with_retry"]
+
+    # Should return the explicitly set value
+    assert mapped_deser.retry_delay == custom_retry_delay

Reply via email to