jscheffl commented on code in PR #67226:
URL: https://github.com/apache/airflow/pull/67226#discussion_r3277028430
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -1074,12 +1074,19 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
"Trigger emitted an %s event, failing the task: %s",
event["status"], event["message"]
)
message = event.get("stack_trace", event["message"])
+ # Push manually before the raise — matches the sync-path
+ # failure-push in cleanup ("Ensure that existing XCom is
+ # pushed even in case of failure").
+ if self.do_xcom_push and xcom_sidecar_output:
+ context["ti"].xcom_push(XCOM_RETURN_KEY,
xcom_sidecar_output)
Review Comment:
Thanks for adding for consistency. But now you are repeating the same "bug"
like it was before and as you're trying to fix? In the case of failure in async
it is the same (bad) behavior like in the sync call that multiple outputs is
not handled.
Can you add a utility that is in both ends consistently used that minics the
task runner handling for multiple outputs or use a task runner public method
for this?
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -3451,13 +3451,17 @@ def
test_async_kpo_wait_termination_before_cleanup_on_success(
# check if it gets the pod
mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME,
TEST_NAMESPACE)
- # assert that the xcom are extracted/not extracted
+ # On the success path, ``trigger_reentry`` returns the sidecar output and
+ # leaves the XCom push to the task runner's ``_push_xcom_if_needed`` —
+ # see #67224. The operator no longer pushes ``return_value`` manually here.
Review Comment:
I think no comment needed on bugifx
```suggestion
```
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -1074,12 +1074,19 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
"Trigger emitted an %s event, failing the task: %s",
event["status"], event["message"]
)
message = event.get("stack_trace", event["message"])
+ # Push manually before the raise — matches the sync-path
+ # failure-push in cleanup ("Ensure that existing XCom is
+ # pushed even in case of failure").
+ if self.do_xcom_push and xcom_sidecar_output:
+ context["ti"].xcom_push(XCOM_RETURN_KEY,
xcom_sidecar_output)
raise AirflowException(message)
finally:
self._clean(event=event, context=context,
result=xcom_sidecar_output)
- if self.do_xcom_push and xcom_sidecar_output:
- context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)
+ # Return on success so the task runner's _push_xcom_if_needed handles
+ # return_value and multiple_outputs fan-out, matching execute_sync.
Review Comment:
I think comment not needed as it represents the default.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]