This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new d5a6971ba5e [v3-1-test] Fix HITL operators failing when using 
notifiers (#57494) (#57551)
d5a6971ba5e is described below

commit d5a6971ba5eec8b47a74c0fd500e21ff643ee1b0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 30 18:45:35 2025 +0530

    [v3-1-test] Fix HITL operators failing when using notifiers (#57494) 
(#57551)
    
    (cherry picked from commit 552e60505a9473f32494a6bb3dd3078f9b1c038a)
    
    Co-authored-by: Amogh Desai <[email protected]>
---
 task-sdk/src/airflow/sdk/api/client.py                    |  6 +++---
 task-sdk/src/airflow/sdk/execution_time/comms.py          | 11 +++++++++++
 task-sdk/src/airflow/sdk/execution_time/supervisor.py     |  6 ++++--
 task-sdk/tests/task_sdk/api/test_client.py                |  4 ++--
 task-sdk/tests/task_sdk/execution_time/test_supervisor.py |  2 --
 5 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index bc735f68474..f4476322e8a 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -43,6 +43,7 @@ from airflow.sdk.api.datamodels._generated import (
     ConnectionResponse,
     DagRunStateResponse,
     DagRunType,
+    HITLDetailRequest,
     HITLDetailResponse,
     HITLUser,
     InactiveAssetsResponse,
@@ -72,7 +73,6 @@ from airflow.sdk.execution_time.comms import (
     CreateHITLDetailPayload,
     DRCount,
     ErrorResponse,
-    HITLDetailRequestResult,
     OKResponse,
     PreviousDagRunResult,
     SkipDownstreamTasks,
@@ -725,7 +725,7 @@ class HITLOperations:
         multiple: bool = False,
         params: dict[str, Any] | None = None,
         assigned_users: list[HITLUser] | None = None,
-    ) -> HITLDetailRequestResult:
+    ) -> HITLDetailRequest:
         """Add a Human-in-the-loop response that waits for human response for 
a specific Task Instance."""
         payload = CreateHITLDetailPayload(
             ti_id=ti_id,
@@ -741,7 +741,7 @@ class HITLOperations:
             f"/hitlDetails/{ti_id}",
             content=payload.model_dump_json(),
         )
-        return HITLDetailRequestResult.model_validate_json(resp.read())
+        return HITLDetailRequest.model_validate_json(resp.read())
 
     def update_response(
         self,
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py 
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index bfa9fb012ae..fe6435ca7f8 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -584,6 +584,17 @@ class HITLDetailRequestResult(HITLDetailRequest):
 
     type: Literal["HITLDetailRequestResult"] = "HITLDetailRequestResult"
 
+    @classmethod
+    def from_api_response(cls, hitl_request: HITLDetailRequest) -> 
HITLDetailRequestResult:
+        """
+        Get HITLDetailRequestResult from HITLDetailRequest (API response).
+
+        HITLDetailRequest is the API response model. We convert it to 
HITLDetailRequestResult
+        for communication between the Supervisor and task process, adding the 
discriminator field
+        required for the tagged union deserialization.
+        """
+        return cls(**hitl_request.model_dump(exclude_defaults=True), 
type="HITLDetailRequestResult")
+
 
 ToTask = Annotated[
     AssetResult
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 6bf609f1139..fafd56209bd 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -93,6 +93,7 @@ from airflow.sdk.execution_time.comms import (
     GetXComCount,
     GetXComSequenceItem,
     GetXComSequenceSlice,
+    HITLDetailRequestResult,
     InactiveAssetsResult,
     MaskSecret,
     PrevSuccessfulDagRunResult,
@@ -1352,7 +1353,7 @@ class ActivitySubprocess(WatchedSubprocess):
                 # Since we've sent the message, return. Nothing else in this 
ifelse/switch should return directly
                 return
         elif isinstance(msg, CreateHITLDetailPayload):
-            resp = self.client.hitl.add_response(
+            hitl_detail_request = self.client.hitl.add_response(
                 ti_id=msg.ti_id,
                 options=msg.options,
                 subject=msg.subject,
@@ -1362,7 +1363,8 @@ class ActivitySubprocess(WatchedSubprocess):
                 multiple=msg.multiple,
                 assigned_users=msg.assigned_users,
             )
-            self.send_msg(resp, request_id=req_id, error=None, **dump_opts)
+            resp = 
HITLDetailRequestResult.from_api_response(hitl_detail_request)
+            dump_opts = {"exclude_unset": True}
         elif isinstance(msg, MaskSecret):
             mask_secret(msg.value, msg.name)
         else:
diff --git a/task-sdk/tests/task_sdk/api/test_client.py 
b/task-sdk/tests/task_sdk/api/test_client.py
index e73f05eeac8..e5322519fbf 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -38,6 +38,7 @@ from airflow.sdk.api.datamodels._generated import (
     ConnectionResponse,
     DagRunState,
     DagRunStateResponse,
+    HITLDetailRequest,
     HITLDetailResponse,
     HITLUser,
     VariableResponse,
@@ -47,7 +48,6 @@ from airflow.sdk.exceptions import ErrorType
 from airflow.sdk.execution_time.comms import (
     DeferTask,
     ErrorResponse,
-    HITLDetailRequestResult,
     OKResponse,
     PreviousDagRunResult,
     RescheduleTask,
@@ -1300,7 +1300,7 @@ class TestHITLOperations:
             params=None,
             multiple=False,
         )
-        assert isinstance(result, HITLDetailRequestResult)
+        assert isinstance(result, HITLDetailRequest)
         assert result.ti_id == ti_id
         assert result.options == ["Approval", "Reject"]
         assert result.subject == "This is subject"
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 7c9828a29fd..bfd8caa3b47 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2030,9 +2030,7 @@ REQUEST_TEST_CASES = [
             "subject": "This is subject",
             "body": "This is body",
             "defaults": ["Approve"],
-            "multiple": False,
             "params": {},
-            "assigned_users": None,
             "type": "HITLDetailRequestResult",
         },
         client_mock=ClientMock(

Reply via email to