Hi Dennis,

I would be happy to support again and take over the Callback implementation
for the Batch Executor.

Best
Sebastian

Ferruzzi, Dennis <[email protected]> schrieb am Mi., 4. März 2026, 08:53:

> Hey all,
>
> Airflow 3.2 introduces ExecutorCallbacks, allowing executors to run
> synchronous callbacks (starting with Deadline Alerts). LocalExecutor and
> CeleryExecutor support is done and tested and I'm looking for volunteers to
> help implement the remaining executors.  There's also an opportunity to
> help migrate Airflow's existing DAG/task callbacks onto this same
> infrastructure.
>
>
> ***********************************************
> WHAT ARE EXECUTOR CALLBACKS?
> ***********************************************
>
> Deadline Alerts allow users to set a deadline on a DAG: "if this DAG isn't
> done by this time, run this callback."  If the callback is asynchronous
> code, it goes to the Triggerer to execute.  If it's synchronous code then
> it now (as of PR #61153 [1]) gets sent to the executor and run in the
> worker, just like a Dag task.  The scheduler queues them, the executor
> picks them up, and a supervised subprocess imports and runs the callback
> function (that last part about the supervisor is still in PR [2]).
>
>
> ******************
> WHAT'S DONE
> ******************
>
>   - Core framework is all done in PR #61153 [1].  There is now a
> ExecutorWorkload which encompasses ExecuteTask and ExecuteCallback. Some
> work is still in progress, but Callbacks have almost all the same features
> and "rights" as an executor Task.
>   - LocalExecutor is fully supported and tested, it'll launch in 3.2.
> Callbacks are sent to worker processes alongside tasks, with supervised
> subprocess execution coming [2].
>   - CeleryExecutor is fully supported and tested and will also launch in
> 3.2.  Callbacks are sent to Celery workers as workloads, executed via
> supervise_callback.
>
> Those implementations serve as reference examples and should give a pretty
> good idea how to handle the remaining work.
>
>
> ******************************
> WHERE YOU CAN HELP
> ******************************
>
>
> ****************************
> FIVE SMALLISH TASKS
> ****************************
>
> Each of the five remaining executors needs:
>   1. Set supports_callbacks = True on the executor class.
>
>   2. Handle ExecuteCallback workloads in _process_workloads().  Right now
> these executors either raise RuntimeError or NotImplementedError when they
> see a non-Task workload. We need to implement sending the callback for
> execution using supervise_callback() from
> airflow.sdk.execution_time.callback_supervisor.
>
>   3. Add tests covering the callback execution path.
>
> The specific executors that need work:
>
>   - Kubernetes Executor (CNCF provider): Needs a way to run
> supervise_callback in a pod. May need a lightweight callback pod template
> or reuse the existing worker pod mechanism.  I don't know enough about K8S
> to really call how complicated this will be and I can really use someone
> with K8S experience to take this one on.  I can definitely help understand
> the existing code, but I won't be much help on the actual change
> implementation.
>
>   - ECS Executor (Amazon provider): Similar pattern, needs to send the
> callback to an ECS task
>
>   - Batch Executor (Amazon provider): Send the callback to a Batch job.
>
>   - Lambda Executor (Amazon provider): Lambda is a simpler execution
> model, but callback import/execution in the Lambda runtime may be a wrinkle.
>
>   - EdgeExecutor (Edge3 provider): Send the callback to an Edge worker.
> Like K8S, this is another I don't have any real experience with so I can
> help with onboarding but may not be as much use on the actual
> implementation.
>
> Note: The hybrid executors (CeleryKubernetesExecutor,
> LocalKubernetesExecutor) are deprecated and do not need to be updated.
>
>
> *************************
> ONE LARGER TASK
> *************************
>
> In addition to those smaller tasks, there is one bigger one on my "Some
> Day..." list which is tangentially related.  One side-goal of the
> ExecutorCallbacks infrastructure is to unify the other callbacks onto the
> same framework.  This isn't related to DeadlineAlerts and I still have a
> pile of work to do on that feature, so perhaps I can entice someone to take
> on the work for that?
>
> Airflow's DAG-level and task-level callbacks (on_success_callback,
> on_failure_callback, etc) currently flow ((current.... flow?  I'll see
> myself out...)) through a completely separate path.  They are packaged as
> DagCallbackRequest/TaskCallbackRequest and executed in the Dag Processor,
> which has several downsides.  Migrating them to the new (synchronous)
> ExecutorCallback and/or AsyncCallback framework will let us run them in the
> worker or triggerer and let us give them the same level of isolation and
> support as a task which the Deadline callbacks now get.
>
> The groundwork is already laid with the new ExecutorCallback framework and
> the CallbackFetchMethod enum already has a DAG_ATTRIBUTE variant stubbed
> out for exactly this purpose. The migration would involve:
>
>   1. When the scheduler/DagRun detects a callback is needed (e.g., DAG
> success/failure, task retry/failure), instead of creating a
> DagCallbackRequest and sending it to the Dag Processor, create a Callback
> model record and let the scheduler queue it as an ExecuteCallback workload.
>
>   2. Extend the CallbackSubprocess (or supervise_callback maybe??) to
> support the DAG_ATTRIBUTE fetch method and resolve the callback from the
> DAG object's attributes (e.g., dag.on_failure_callback) rather than from an
> import path.
>
>   3. Migrate the callback execution out of dag_processing/processor.py
> (the _execute_callbacks and related code paths) and into the executor path.
>
>   4. Update or deprecate the old
> DagCallbackRequest/TaskCallbackRequest/CallbackSink infrastructure once the
> new path is proven.
>
> This is a larger piece of work that I haven't planned too deeply, so it's
> maybe not a "good first task" kind of job, but it would make all callback
> execution consistent, observable, and logged regardless of which executor
> you use and it would remove callback execution load from the Dag Processor.
> If this interests you, I'd love to collaborate on the design and help
> however I can.
>
>
> *****************************
> HOW TO GET STARTED
> *****************************
>
>   1. Look at the LocalExecutor implementation in
> airflow-core/src/airflow/executors/local_executor.py, specifically the
> _execute_workload function and the ExecuteCallback branch. This is the
> simplest reference.
>
>   2. Look at the CeleryExecutor implementation in providers/celery/ for an
> example of a remote/distributed executor handling callbacks.
>
>   3. The key function you'll call is supervise_callback() from
> airflow.sdk.execution_time.callback_supervisor.  It takes a callback id,
> callback_path, callback_kwargs, and optional log_path.  It forks a
> supervised subprocess, and returns an exit code.
>
>   4. For remote executors (K8s, ECS, Batch, Lambda), the challenge is
> making sure supervise_callback runs on the remote worker side, not in the
> scheduler. The pattern will be similar to how ExecuteTask workloads are
> handled, but will be slightly different for each executor.
>
>
> ***************************
> HOW TO VOLUNTEER
> ***************************
>
> Reply to this thread (or the corresponding GitHub issue) with which
> executor or work item you'd like to take on. One executor per volunteer is
> ideal so we can parallelize the work. I'm happy to review PRs and answer
> questions.  I'll be working on related features (Task-level Deadlines,
> Asset-based and Event-based Deadlines) in parallel.
>
> Even if you're not an executor expert, this feels like a pretty
> well-scoped contribution with clear examples to follow, an active guide
> (that's me!), and it shouldn't be terribly complicated.  It would be
> wonderful if they can all be done and ready for the next wave of provider
> package releases so the rest of the executors can follow quickly behind the
> first two, if not at the same time.
>
> Thanks!
>
> [1] https://github.com/apache/airflow/pull/61153
> [2] https://github.com/apache/airflow/pull/62645
>

Reply via email to