This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new d5a6971ba5e [v3-1-test] Fix HITL operators failing when using
notifiers (#57494) (#57551)
d5a6971ba5e is described below
commit d5a6971ba5eec8b47a74c0fd500e21ff643ee1b0
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 30 18:45:35 2025 +0530
[v3-1-test] Fix HITL operators failing when using notifiers (#57494)
(#57551)
(cherry picked from commit 552e60505a9473f32494a6bb3dd3078f9b1c038a)
Co-authored-by: Amogh Desai <[email protected]>
---
task-sdk/src/airflow/sdk/api/client.py | 6 +++---
task-sdk/src/airflow/sdk/execution_time/comms.py | 11 +++++++++++
task-sdk/src/airflow/sdk/execution_time/supervisor.py | 6 ++++--
task-sdk/tests/task_sdk/api/test_client.py | 4 ++--
task-sdk/tests/task_sdk/execution_time/test_supervisor.py | 2 --
5 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index bc735f68474..f4476322e8a 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -43,6 +43,7 @@ from airflow.sdk.api.datamodels._generated import (
ConnectionResponse,
DagRunStateResponse,
DagRunType,
+ HITLDetailRequest,
HITLDetailResponse,
HITLUser,
InactiveAssetsResponse,
@@ -72,7 +73,6 @@ from airflow.sdk.execution_time.comms import (
CreateHITLDetailPayload,
DRCount,
ErrorResponse,
- HITLDetailRequestResult,
OKResponse,
PreviousDagRunResult,
SkipDownstreamTasks,
@@ -725,7 +725,7 @@ class HITLOperations:
multiple: bool = False,
params: dict[str, Any] | None = None,
assigned_users: list[HITLUser] | None = None,
- ) -> HITLDetailRequestResult:
+ ) -> HITLDetailRequest:
"""Add a Human-in-the-loop response that waits for human response for
a specific Task Instance."""
payload = CreateHITLDetailPayload(
ti_id=ti_id,
@@ -741,7 +741,7 @@ class HITLOperations:
f"/hitlDetails/{ti_id}",
content=payload.model_dump_json(),
)
- return HITLDetailRequestResult.model_validate_json(resp.read())
+ return HITLDetailRequest.model_validate_json(resp.read())
def update_response(
self,
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index bfa9fb012ae..fe6435ca7f8 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -584,6 +584,17 @@ class HITLDetailRequestResult(HITLDetailRequest):
type: Literal["HITLDetailRequestResult"] = "HITLDetailRequestResult"
+ @classmethod
+ def from_api_response(cls, hitl_request: HITLDetailRequest) ->
HITLDetailRequestResult:
+ """
+ Get HITLDetailRequestResult from HITLDetailRequest (API response).
+
+ HITLDetailRequest is the API response model. We convert it to
HITLDetailRequestResult
+ for communication between the Supervisor and task process, adding the
discriminator field
+ required for the tagged union deserialization.
+ """
+ return cls(**hitl_request.model_dump(exclude_defaults=True),
type="HITLDetailRequestResult")
+
ToTask = Annotated[
AssetResult
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 6bf609f1139..fafd56209bd 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -93,6 +93,7 @@ from airflow.sdk.execution_time.comms import (
GetXComCount,
GetXComSequenceItem,
GetXComSequenceSlice,
+ HITLDetailRequestResult,
InactiveAssetsResult,
MaskSecret,
PrevSuccessfulDagRunResult,
@@ -1352,7 +1353,7 @@ class ActivitySubprocess(WatchedSubprocess):
# Since we've sent the message, return. Nothing else in this
ifelse/switch should return directly
return
elif isinstance(msg, CreateHITLDetailPayload):
- resp = self.client.hitl.add_response(
+ hitl_detail_request = self.client.hitl.add_response(
ti_id=msg.ti_id,
options=msg.options,
subject=msg.subject,
@@ -1362,7 +1363,8 @@ class ActivitySubprocess(WatchedSubprocess):
multiple=msg.multiple,
assigned_users=msg.assigned_users,
)
- self.send_msg(resp, request_id=req_id, error=None, **dump_opts)
+ resp =
HITLDetailRequestResult.from_api_response(hitl_detail_request)
+ dump_opts = {"exclude_unset": True}
elif isinstance(msg, MaskSecret):
mask_secret(msg.value, msg.name)
else:
diff --git a/task-sdk/tests/task_sdk/api/test_client.py
b/task-sdk/tests/task_sdk/api/test_client.py
index e73f05eeac8..e5322519fbf 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -38,6 +38,7 @@ from airflow.sdk.api.datamodels._generated import (
ConnectionResponse,
DagRunState,
DagRunStateResponse,
+ HITLDetailRequest,
HITLDetailResponse,
HITLUser,
VariableResponse,
@@ -47,7 +48,6 @@ from airflow.sdk.exceptions import ErrorType
from airflow.sdk.execution_time.comms import (
DeferTask,
ErrorResponse,
- HITLDetailRequestResult,
OKResponse,
PreviousDagRunResult,
RescheduleTask,
@@ -1300,7 +1300,7 @@ class TestHITLOperations:
params=None,
multiple=False,
)
- assert isinstance(result, HITLDetailRequestResult)
+ assert isinstance(result, HITLDetailRequest)
assert result.ti_id == ti_id
assert result.options == ["Approval", "Reject"]
assert result.subject == "This is subject"
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 7c9828a29fd..bfd8caa3b47 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2030,9 +2030,7 @@ REQUEST_TEST_CASES = [
"subject": "This is subject",
"body": "This is body",
"defaults": ["Approve"],
- "multiple": False,
"params": {},
- "assigned_users": None,
"type": "HITLDetailRequestResult",
},
client_mock=ClientMock(