Hi Dennis,

Since Kevin is taking Kubernetes Executor (disregard my slack message now), I 
can take over callback implementation for the Lambda Executor. If possible, I 
could take ECS Executor as well.

Thanks,
Sameer.

On 2026/03/04 13:22:54 Kevin Yang wrote:
> Hi Dennis,
>
> I would like to support by taking the callback implementation for Kubernetes 
> Executor.
>
> Thanks,
> Kevin Yang
>
> Sent from Outlook for iOS<https://aka.ms/o0ukef>
> ________________________________
> From: Sebastian Daum <[email protected]>
> Sent: Wednesday, March 4, 2026 8:15:22 AM
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] [CALL FOR HELP] Executor Callback support for 
> remaining executors + future callback migration
>
> Hi Dennis,
>
> I would be happy to support again and take over the Callback implementation
> for the Batch Executor.
>
> Best
> Sebastian
>
> Ferruzzi, Dennis <[email protected]> schrieb am Mi., 4. März 2026, 08:53:
>
> > Hey all,
> >
> > Airflow 3.2 introduces ExecutorCallbacks, allowing executors to run
> > synchronous callbacks (starting with Deadline Alerts). LocalExecutor and
> > CeleryExecutor support is done and tested and I'm looking for volunteers to
> > help implement the remaining executors. There's also an opportunity to
> > help migrate Airflow's existing DAG/task callbacks onto this same
> > infrastructure.
> >
> >
> > ***********************************************
> > WHAT ARE EXECUTOR CALLBACKS?
> > ***********************************************
> >
> > Deadline Alerts allow users to set a deadline on a DAG: "if this DAG isn't
> > done by this time, run this callback." If the callback is asynchronous
> > code, it goes to the Triggerer to execute. If it's synchronous code then
> > it now (as of PR #61153 [1]) gets sent to the executor and run in the
> > worker, just like a Dag task. The scheduler queues them, the executor
> > picks them up, and a supervised subprocess imports and runs the callback
> > function (that last part about the supervisor is still in PR [2]).
> >
> >
> > ******************
> > WHAT'S DONE
> > ******************
> >
> > - Core framework is all done in PR #61153 [1]. There is now a
> > ExecutorWorkload which encompasses ExecuteTask and ExecuteCallback. Some
> > work is still in progress, but Callbacks have almost all the same features
> > and "rights" as an executor Task.
> > - LocalExecutor is fully supported and tested, it'll launch in 3.2.
> > Callbacks are sent to worker processes alongside tasks, with supervised
> > subprocess execution coming [2].
> > - CeleryExecutor is fully supported and tested and will also launch in
> > 3.2. Callbacks are sent to Celery workers as workloads, executed via
> > supervise_callback.
> >
> > Those implementations serve as reference examples and should give a pretty
> > good idea how to handle the remaining work.
> >
> >
> > ******************************
> > WHERE YOU CAN HELP
> > ******************************
> >
> >
> > ****************************
> > FIVE SMALLISH TASKS
> > ****************************
> >
> > Each of the five remaining executors needs:
> > 1. Set supports_callbacks = True on the executor class.
> >
> > 2. Handle ExecuteCallback workloads in _process_workloads(). Right now
> > these executors either raise RuntimeError or NotImplementedError when they
> > see a non-Task workload. We need to implement sending the callback for
> > execution using supervise_callback() from
> > airflow.sdk.execution_time.callback_supervisor.
> >
> > 3. Add tests covering the callback execution path.
> >
> > The specific executors that need work:
> >
> > - Kubernetes Executor (CNCF provider): Needs a way to run
> > supervise_callback in a pod. May need a lightweight callback pod template
> > or reuse the existing worker pod mechanism. I don't know enough about K8S
> > to really call how complicated this will be and I can really use someone
> > with K8S experience to take this one on. I can definitely help understand
> > the existing code, but I won't be much help on the actual change
> > implementation.
> >
> > - ECS Executor (Amazon provider): Similar pattern, needs to send the
> > callback to an ECS task
> >
> > - Batch Executor (Amazon provider): Send the callback to a Batch job.
> >
> > - Lambda Executor (Amazon provider): Lambda is a simpler execution
> > model, but callback import/execution in the Lambda runtime may be a wrinkle.
> >
> > - EdgeExecutor (Edge3 provider): Send the callback to an Edge worker.
> > Like K8S, this is another I don't have any real experience with so I can
> > help with onboarding but may not be as much use on the actual
> > implementation.
> >
> > Note: The hybrid executors (CeleryKubernetesExecutor,
> > LocalKubernetesExecutor) are deprecated and do not need to be updated.
> >
> >
> > *************************
> > ONE LARGER TASK
> > *************************
> >
> > In addition to those smaller tasks, there is one bigger one on my "Some
> > Day..." list which is tangentially related. One side-goal of the
> > ExecutorCallbacks infrastructure is to unify the other callbacks onto the
> > same framework. This isn't related to DeadlineAlerts and I still have a
> > pile of work to do on that feature, so perhaps I can entice someone to take
> > on the work for that?
> >
> > Airflow's DAG-level and task-level callbacks (on_success_callback,
> > on_failure_callback, etc) currently flow ((current.... flow? I'll see
> > myself out...)) through a completely separate path. They are packaged as
> > DagCallbackRequest/TaskCallbackRequest and executed in the Dag Processor,
> > which has several downsides. Migrating them to the new (synchronous)
> > ExecutorCallback and/or AsyncCallback framework will let us run them in the
> > worker or triggerer and let us give them the same level of isolation and
> > support as a task which the Deadline callbacks now get.
> >
> > The groundwork is already laid with the new ExecutorCallback framework and
> > the CallbackFetchMethod enum already has a DAG_ATTRIBUTE variant stubbed
> > out for exactly this purpose. The migration would involve:
> >
> > 1. When the scheduler/DagRun detects a callback is needed (e.g., DAG
> > success/failure, task retry/failure), instead of creating a
> > DagCallbackRequest and sending it to the Dag Processor, create a Callback
> > model record and let the scheduler queue it as an ExecuteCallback workload.
> >
> > 2. Extend the CallbackSubprocess (or supervise_callback maybe??) to
> > support the DAG_ATTRIBUTE fetch method and resolve the callback from the
> > DAG object's attributes (e.g., dag.on_failure_callback) rather than from an
> > import path.
> >
> > 3. Migrate the callback execution out of dag_processing/processor.py
> > (the _execute_callbacks and related code paths) and into the executor path.
> >
> > 4. Update or deprecate the old
> > DagCallbackRequest/TaskCallbackRequest/CallbackSink infrastructure once the
> > new path is proven.
> >
> > This is a larger piece of work that I haven't planned too deeply, so it's
> > maybe not a "good first task" kind of job, but it would make all callback
> > execution consistent, observable, and logged regardless of which executor
> > you use and it would remove callback execution load from the Dag Processor.
> > If this interests you, I'd love to collaborate on the design and help
> > however I can.
> >
> >
> > *****************************
> > HOW TO GET STARTED
> > *****************************
> >
> > 1. Look at the LocalExecutor implementation in
> > airflow-core/src/airflow/executors/local_executor.py, specifically the
> > _execute_workload function and the ExecuteCallback branch. This is the
> > simplest reference.
> >
> > 2. Look at the CeleryExecutor implementation in providers/celery/ for an
> > example of a remote/distributed executor handling callbacks.
> >
> > 3. The key function you'll call is supervise_callback() from
> > airflow.sdk.execution_time.callback_supervisor. It takes a callback id,
> > callback_path, callback_kwargs, and optional log_path. It forks a
> > supervised subprocess, and returns an exit code.
> >
> > 4. For remote executors (K8s, ECS, Batch, Lambda), the challenge is
> > making sure supervise_callback runs on the remote worker side, not in the
> > scheduler. The pattern will be similar to how ExecuteTask workloads are
> > handled, but will be slightly different for each executor.
> >
> >
> > ***************************
> > HOW TO VOLUNTEER
> > ***************************
> >
> > Reply to this thread (or the corresponding GitHub issue) with which
> > executor or work item you'd like to take on. One executor per volunteer is
> > ideal so we can parallelize the work. I'm happy to review PRs and answer
> > questions. I'll be working on related features (Task-level Deadlines,
> > Asset-based and Event-based Deadlines) in parallel.
> >
> > Even if you're not an executor expert, this feels like a pretty
> > well-scoped contribution with clear examples to follow, an active guide
> > (that's me!), and it shouldn't be terribly complicated. It would be
> > wonderful if they can all be done and ready for the next wave of provider
> > package releases so the rest of the executors can follow quickly behind the
> > first two, if not at the same time.
> >
> > Thanks!
> >
> > [1] https://github.com/apache/airflow/pull/61153
> > [2] https://github.com/apache/airflow/pull/62645
> >
>

Reply via email to