ferruzzi commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r2875336342
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -209,9 +209,14 @@ def execute_workload(input: str) -> None:
log_path=workload.log_path,
)
elif isinstance(workload, workloads.ExecuteCallback):
- success, error_msg = execute_callback_workload(workload.callback, log)
- if not success:
- raise RuntimeError(error_msg or "Callback execution failed")
+ exit_code = supervise_callback(
+ id=workload.callback.id,
+ callback_path=workload.callback.data.get("path", ""),
+ callback_kwargs=workload.callback.data.get("kwargs", {}),
+ log_path=workload.log_path,
+ )
+ if exit_code != 0:
+ raise RuntimeError(f"Callback subprocess exited with code
{exit_code}")
Review Comment:
That actually goes back to the question below. Tasks use the decoder you
asked about to call the (Succeed|Retry|Defer|etc)Task endpoints which in turn
handle the logic. Since we don't have (and maybe don't want, TBD) that
infrastructure on a callback process, then this is the alternative I came up
with.
We could, in theory, extend this by adding the comms/decoder and having an
API endpoint handle the callback state transitions. Which isn't a bad idea,
but depends on the above email.
~If you want, I can move this logging into supervise_callback for the same
effect without the apparent mismatch between the two paths? I don't believe
that would have any real difference on the actual execution but might maybe
look cleaner?~
EDIT - You made the same suggestion that I did, I'll move this into the
supervise_callback
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]