This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1ab5bb63fc0 Fix KeyError when accessing retry_delay on MappedOperator
without explicit value (#56605)
1ab5bb63fc0 is described below
commit 1ab5bb63fc00257bf492e1ef2fe71b4c2ca067a5
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Wed Oct 15 18:38:41 2025 -0500
Fix KeyError when accessing retry_delay on MappedOperator without explicit
value (#56605)
When querying the API endpoint for mapped tasks (e.g.,
/api/v2/dags/{dag_id}/tasks/{task_id}),
the server returned a 500 Internal Server Error if the mapped task didn't
have an explicit
retry_delay set. The retry_delay property was using direct dictionary
access which raised
a KeyError during serialization.
Changed to use .get() with the default value from SerializedBaseOperator
(300 seconds)
---
airflow-core/src/airflow/models/mappedoperator.py | 2 +-
.../tests/unit/models/test_mappedoperator.py | 43 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/models/mappedoperator.py
b/airflow-core/src/airflow/models/mappedoperator.py
index b29877e9228..e7fa851e695 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -274,7 +274,7 @@ class MappedOperator(DAGNode):
@property
def retry_delay(self) -> datetime.timedelta:
- return self.partial_kwargs["retry_delay"]
+ return self.partial_kwargs.get("retry_delay",
SerializedBaseOperator.retry_delay)
@property
def retry_exponential_backoff(self) -> bool:
diff --git a/airflow-core/tests/unit/models/test_mappedoperator.py
b/airflow-core/tests/unit/models/test_mappedoperator.py
index 84d05b3b932..ca21fc2464b 100644
--- a/airflow-core/tests/unit/models/test_mappedoperator.py
+++ b/airflow-core/tests/unit/models/test_mappedoperator.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import datetime
from collections import defaultdict
from typing import TYPE_CHECKING
from unittest import mock
@@ -1401,3 +1402,45 @@ def
test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_m
dr.task_instance_scheduling_decisions()
ti3 = dr.get_task_instance(task_id="tg1.t3")
assert not ti3.state
+
+
+def test_mapped_operator_retry_delay_default(dag_maker):
+ """
+ Test that MappedOperator.retry_delay returns default value when not
explicitly set.
+
+ This test verifies the fix for a KeyError that occurred when accessing
retry_delay
+ on a MappedOperator without an explicit retry_delay value in
partial_kwargs.
+ The property should fall back to SerializedBaseOperator.retry_delay (300
seconds).
+ """
+ with dag_maker(dag_id="test_retry_delay", serialized=True) as dag:
+ # Create a mapped operator without explicitly setting retry_delay
+ MockOperator.partial(task_id="mapped_task").expand(arg2=[1, 2, 3])
+
+ # Get the deserialized mapped task
+ mapped_deser = dag.task_dict["mapped_task"]
+
+ # Accessing retry_delay should not raise KeyError
+ # and should return the default value (300 seconds)
+ assert mapped_deser.retry_delay == datetime.timedelta(seconds=300)
+
+
+def test_mapped_operator_retry_delay_explicit(dag_maker):
+ """
+ Test that MappedOperator.retry_delay returns explicit value when set.
+
+ This test verifies that when retry_delay is explicitly set in partial(),
+ the MappedOperator returns that value instead of the default.
+ """
+ custom_retry_delay = datetime.timedelta(seconds=600)
+
+ with dag_maker(dag_id="test_retry_delay_explicit", serialized=True) as dag:
+ # Create a mapped operator with explicit retry_delay
+ MockOperator.partial(task_id="mapped_task_with_retry",
retry_delay=custom_retry_delay).expand(
+ arg2=[1, 2, 3]
+ )
+
+ # Get the deserialized mapped task
+ mapped_deser = dag.task_dict["mapped_task_with_retry"]
+
+ # Should return the explicitly set value
+ assert mapped_deser.retry_delay == custom_retry_delay