This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 51e98df6519 fix(providers/standard): add response_timeout to 
HITLOperator to prevent race with execution_timeout (#63475)
51e98df6519 is described below

commit 51e98df6519ea570bcbbd96e36af5aacfb4e71ea
Author: Antonio Mello <[email protected]>
AuthorDate: Mon Mar 23 22:29:21 2026 -0300

    fix(providers/standard): add response_timeout to HITLOperator to prevent 
race with execution_timeout (#63475)
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../standard/example_dags/example_hitl_operator.py |  4 +--
 .../airflow/providers/standard/operators/hitl.py   | 27 +++++++++++++++--
 .../tests/unit/standard/operators/test_hitl.py     | 35 +++++++++++++++++++---
 3 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
 
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
index 96404a0ebc1..dc26aa15708 100644
--- 
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
+++ 
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
@@ -118,7 +118,7 @@ with DAG(
         subject="Please choose option to proceed: ",
         options=["option 7", "option 8", "option 9"],
         defaults=["option 7"],
-        execution_timeout=datetime.timedelta(seconds=1),
+        response_timeout=datetime.timedelta(seconds=1),
         notifiers=[hitl_request_callback],
         on_success_callback=hitl_success_callback,
         on_failure_callback=hitl_failure_callback,
@@ -136,7 +136,7 @@ with DAG(
         Timeout Option: {{ 
ti.xcom_pull(task_ids='wait_for_default_option')["chosen_options"] }}
         """,
         defaults="Reject",
-        execution_timeout=datetime.timedelta(minutes=5),
+        response_timeout=datetime.timedelta(minutes=5),
         notifiers=[hitl_request_callback],
         on_success_callback=hitl_success_callback,
         on_failure_callback=hitl_failure_callback,
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/hitl.py 
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index b009422be8d..d3199a7b0a1 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -17,7 +17,9 @@
 from __future__ import annotations
 
 import logging
+import warnings
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
 from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_3_PLUS, 
AIRFLOW_V_3_1_PLUS
 
@@ -25,6 +27,7 @@ if not AIRFLOW_V_3_1_PLUS:
     raise AirflowOptionalProviderFeatureException("Human in the loop 
functionality needs Airflow 3.1+.")
 
 from collections.abc import Collection, Mapping, Sequence
+from datetime import timedelta
 from typing import TYPE_CHECKING, Any
 from urllib.parse import ParseResult, urlencode, urlparse, urlunparse
 
@@ -55,6 +58,9 @@ class HITLOperator(BaseOperator):
     :param params: dictionary of parameter definitions that are in the format 
of Dag params such that
         a Form Field can be rendered. Entered data is validated (schema, 
required fields) like for a Dag run
         and added to XCom of the task result.
+    :param response_timeout: Maximum time to wait for a human response after 
deferring to the trigger.
+        This is separate from ``execution_timeout`` which controls the 
pre-defer execution phase.
+        If not set, no timeout is applied to the human response wait.
     """
 
     template_fields: Collection[str] = ("subject", "body")
@@ -70,9 +76,26 @@ class HITLOperator(BaseOperator):
         params: ParamsDict | dict[str, Any] | None = None,
         notifiers: Sequence[BaseNotifier] | BaseNotifier | None = None,
         assigned_users: HITLUser | list[HITLUser] | None = None,
+        response_timeout: timedelta | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+
+        # Handle backward compatibility: if execution_timeout is set but 
response_timeout is not,
+        # migrate execution_timeout to response_timeout and clear it to 
prevent the BaseOperator
+        # timeout from racing the defer() call.
+        if self.execution_timeout and not response_timeout:
+            warnings.warn(
+                "Passing `execution_timeout` to HITLOperator to control the 
human response wait is "
+                "deprecated. Use `response_timeout` instead. 
`execution_timeout` will be cleared to "
+                "prevent it from killing the task before defer() is reached.",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+            response_timeout = self.execution_timeout
+            self.execution_timeout = None
+
+        self.response_timeout = response_timeout
         self.subject = subject
         self.body = body
 
@@ -160,8 +183,8 @@ class HITLOperator(BaseOperator):
             assigned_users=self.assigned_users,
         )
 
-        if self.execution_timeout:
-            timeout_datetime = utcnow() + self.execution_timeout
+        if self.response_timeout:
+            timeout_datetime = utcnow() + self.response_timeout
         else:
             timeout_datetime = None
 
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py 
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index 6a71c3d1d94..e1ebae5ab00 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -32,6 +32,7 @@ from urllib.parse import parse_qs, urlparse
 
 from sqlalchemy import select
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import TaskInstance, Trigger
 from airflow.models.hitl import HITLDetail
 from airflow.providers.common.compat.sdk import AirflowException, 
DownstreamTasksSkipped, ParamValidationError
@@ -1028,13 +1029,13 @@ class TestHITLSummaryForListeners:
             "serialized_params": None,
         }
 
-    def test_execute_enriches_summary_with_timeout(self) -> None:
-        """execute() adds timeout_datetime; all other init keys remain."""
+    def test_execute_enriches_summary_with_response_timeout(self) -> None:
+        """execute() adds timeout_datetime using response_timeout; all other 
init keys remain."""
         op = HITLOperator(
             task_id="test",
             subject="Review",
             options=["OK"],
-            execution_timeout=datetime.timedelta(minutes=10),
+            response_timeout=datetime.timedelta(minutes=10),
         )
 
         with (
@@ -1084,6 +1085,32 @@ class TestHITLSummaryForListeners:
             "timeout_datetime": None,
         }
 
+    def test_execution_timeout_deprecated_and_migrated(self) -> None:
+        """execution_timeout is migrated to response_timeout with a 
deprecation warning."""
+        with pytest.warns(AirflowProviderDeprecationWarning, match="Use 
`response_timeout` instead"):
+            op = HITLOperator(
+                task_id="test",
+                subject="Review",
+                options=["OK"],
+                execution_timeout=datetime.timedelta(minutes=10),
+            )
+
+        assert op.response_timeout == datetime.timedelta(minutes=10)
+        assert op.execution_timeout is None
+
+    def test_response_timeout_does_not_clear_execution_timeout(self) -> None:
+        """When response_timeout is set, execution_timeout is left 
untouched."""
+        op = HITLOperator(
+            task_id="test",
+            subject="Review",
+            options=["OK"],
+            response_timeout=datetime.timedelta(minutes=5),
+            execution_timeout=datetime.timedelta(minutes=30),
+        )
+
+        assert op.response_timeout == datetime.timedelta(minutes=5)
+        assert op.execution_timeout == datetime.timedelta(minutes=30)
+
     def test_hitl_operator_execute_complete_enriches_summary(self) -> None:
         """execute_complete() adds response fields directly into 
hitl_summary."""
         op = HITLOperator(
@@ -1257,7 +1284,7 @@ class TestHITLSummaryForListeners:
             task_id="test",
             subject="Release v2.0?",
             body="Please approve the production deployment.",
-            execution_timeout=datetime.timedelta(minutes=30),
+            response_timeout=datetime.timedelta(minutes=30),
         )
 
         # -- After __init__: only base + approval keys --

Reply via email to