This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 51e98df6519 fix(providers/standard): add response_timeout to
HITLOperator to prevent race with execution_timeout (#63475)
51e98df6519 is described below
commit 51e98df6519ea570bcbbd96e36af5aacfb4e71ea
Author: Antonio Mello <[email protected]>
AuthorDate: Mon Mar 23 22:29:21 2026 -0300
fix(providers/standard): add response_timeout to HITLOperator to prevent
race with execution_timeout (#63475)
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../standard/example_dags/example_hitl_operator.py | 4 +--
.../airflow/providers/standard/operators/hitl.py | 27 +++++++++++++++--
.../tests/unit/standard/operators/test_hitl.py | 35 +++++++++++++++++++---
3 files changed, 58 insertions(+), 8 deletions(-)
diff --git
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
index 96404a0ebc1..dc26aa15708 100644
---
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
+++
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
@@ -118,7 +118,7 @@ with DAG(
subject="Please choose option to proceed: ",
options=["option 7", "option 8", "option 9"],
defaults=["option 7"],
- execution_timeout=datetime.timedelta(seconds=1),
+ response_timeout=datetime.timedelta(seconds=1),
notifiers=[hitl_request_callback],
on_success_callback=hitl_success_callback,
on_failure_callback=hitl_failure_callback,
@@ -136,7 +136,7 @@ with DAG(
Timeout Option: {{
ti.xcom_pull(task_ids='wait_for_default_option')["chosen_options"] }}
""",
defaults="Reject",
- execution_timeout=datetime.timedelta(minutes=5),
+ response_timeout=datetime.timedelta(minutes=5),
notifiers=[hitl_request_callback],
on_success_callback=hitl_success_callback,
on_failure_callback=hitl_failure_callback,
diff --git
a/providers/standard/src/airflow/providers/standard/operators/hitl.py
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index b009422be8d..d3199a7b0a1 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -17,7 +17,9 @@
from __future__ import annotations
import logging
+import warnings
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_3_PLUS,
AIRFLOW_V_3_1_PLUS
@@ -25,6 +27,7 @@ if not AIRFLOW_V_3_1_PLUS:
raise AirflowOptionalProviderFeatureException("Human in the loop
functionality needs Airflow 3.1+.")
from collections.abc import Collection, Mapping, Sequence
+from datetime import timedelta
from typing import TYPE_CHECKING, Any
from urllib.parse import ParseResult, urlencode, urlparse, urlunparse
@@ -55,6 +58,9 @@ class HITLOperator(BaseOperator):
:param params: dictionary of parameter definitions that are in the format
of Dag params such that
a Form Field can be rendered. Entered data is validated (schema,
required fields) like for a Dag run
and added to XCom of the task result.
+ :param response_timeout: Maximum time to wait for a human response after
deferring to the trigger.
+ This is separate from ``execution_timeout`` which controls the
pre-defer execution phase.
+ If not set, no timeout is applied to the human response wait.
"""
template_fields: Collection[str] = ("subject", "body")
@@ -70,9 +76,26 @@ class HITLOperator(BaseOperator):
params: ParamsDict | dict[str, Any] | None = None,
notifiers: Sequence[BaseNotifier] | BaseNotifier | None = None,
assigned_users: HITLUser | list[HITLUser] | None = None,
+ response_timeout: timedelta | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
+
+ # Handle backward compatibility: if execution_timeout is set but
response_timeout is not,
+ # migrate execution_timeout to response_timeout and clear it to
prevent the BaseOperator
+ # timeout from racing the defer() call.
+ if self.execution_timeout and not response_timeout:
+ warnings.warn(
+ "Passing `execution_timeout` to HITLOperator to control the
human response wait is "
+ "deprecated. Use `response_timeout` instead.
`execution_timeout` will be cleared to "
+ "prevent it from killing the task before defer() is reached.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ response_timeout = self.execution_timeout
+ self.execution_timeout = None
+
+ self.response_timeout = response_timeout
self.subject = subject
self.body = body
@@ -160,8 +183,8 @@ class HITLOperator(BaseOperator):
assigned_users=self.assigned_users,
)
- if self.execution_timeout:
- timeout_datetime = utcnow() + self.execution_timeout
+ if self.response_timeout:
+ timeout_datetime = utcnow() + self.response_timeout
else:
timeout_datetime = None
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index 6a71c3d1d94..e1ebae5ab00 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -32,6 +32,7 @@ from urllib.parse import parse_qs, urlparse
from sqlalchemy import select
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import TaskInstance, Trigger
from airflow.models.hitl import HITLDetail
from airflow.providers.common.compat.sdk import AirflowException,
DownstreamTasksSkipped, ParamValidationError
@@ -1028,13 +1029,13 @@ class TestHITLSummaryForListeners:
"serialized_params": None,
}
- def test_execute_enriches_summary_with_timeout(self) -> None:
- """execute() adds timeout_datetime; all other init keys remain."""
+ def test_execute_enriches_summary_with_response_timeout(self) -> None:
+ """execute() adds timeout_datetime using response_timeout; all other
init keys remain."""
op = HITLOperator(
task_id="test",
subject="Review",
options=["OK"],
- execution_timeout=datetime.timedelta(minutes=10),
+ response_timeout=datetime.timedelta(minutes=10),
)
with (
@@ -1084,6 +1085,32 @@ class TestHITLSummaryForListeners:
"timeout_datetime": None,
}
+ def test_execution_timeout_deprecated_and_migrated(self) -> None:
+ """execution_timeout is migrated to response_timeout with a
deprecation warning."""
+ with pytest.warns(AirflowProviderDeprecationWarning, match="Use
`response_timeout` instead"):
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["OK"],
+ execution_timeout=datetime.timedelta(minutes=10),
+ )
+
+ assert op.response_timeout == datetime.timedelta(minutes=10)
+ assert op.execution_timeout is None
+
+ def test_response_timeout_does_not_clear_execution_timeout(self) -> None:
+ """When response_timeout is set, execution_timeout is left
untouched."""
+ op = HITLOperator(
+ task_id="test",
+ subject="Review",
+ options=["OK"],
+ response_timeout=datetime.timedelta(minutes=5),
+ execution_timeout=datetime.timedelta(minutes=30),
+ )
+
+ assert op.response_timeout == datetime.timedelta(minutes=5)
+ assert op.execution_timeout == datetime.timedelta(minutes=30)
+
def test_hitl_operator_execute_complete_enriches_summary(self) -> None:
"""execute_complete() adds response fields directly into
hitl_summary."""
op = HITLOperator(
@@ -1257,7 +1284,7 @@ class TestHITLSummaryForListeners:
task_id="test",
subject="Release v2.0?",
body="Please approve the production deployment.",
- execution_timeout=datetime.timedelta(minutes=30),
+ response_timeout=datetime.timedelta(minutes=30),
)
# -- After __init__: only base + approval keys --