Hi Dennis, I would like to take over the callback implementation for the Lambda executor.
Thanks, Sameer. On 2026/03/04 13:22:54 Kevin Yang wrote: > Hi Dennis, > > I would like to support by taking the callback implementation for Kubernetes > Executor. > > Thanks, > Kevin Yang > > Sent from Outlook for iOS<https://aka.ms/o0ukef> > ________________________________ > From: Sebastian Daum <[email protected]> > Sent: Wednesday, March 4, 2026 8:15:22 AM > To: [email protected] <[email protected]> > Subject: Re: [DISCUSS] [CALL FOR HELP] Executor Callback support for > remaining executors + future callback migration > > Hi Dennis, > > I would be happy to support again and take over the Callback implementation > for the Batch Executor. > > Best > Sebastian > > Ferruzzi, Dennis <[email protected]> schrieb am Mi., 4. März 2026, 08:53: > > > Hey all, > > > > Airflow 3.2 introduces ExecutorCallbacks, allowing executors to run > > synchronous callbacks (starting with Deadline Alerts). LocalExecutor and > > CeleryExecutor support is done and tested and I'm looking for volunteers to > > help implement the remaining executors. There's also an opportunity to > > help migrate Airflow's existing DAG/task callbacks onto this same > > infrastructure. > > > > > > *********************************************** > > WHAT ARE EXECUTOR CALLBACKS? > > *********************************************** > > > > Deadline Alerts allow users to set a deadline on a DAG: "if this DAG isn't > > done by this time, run this callback." If the callback is asynchronous > > code, it goes to the Triggerer to execute. If it's synchronous code then > > it now (as of PR #61153 [1]) gets sent to the executor and run in the > > worker, just like a Dag task. The scheduler queues them, the executor > > picks them up, and a supervised subprocess imports and runs the callback > > function (that last part about the supervisor is still in PR [2]). > > > > > > ****************** > > WHAT'S DONE > > ****************** > > > > - Core framework is all done in PR #61153 [1]. There is now a > > ExecutorWorkload which encompasses ExecuteTask and ExecuteCallback. Some > > work is still in progress, but Callbacks have almost all the same features > > and "rights" as an executor Task. > > - LocalExecutor is fully supported and tested, it'll launch in 3.2. > > Callbacks are sent to worker processes alongside tasks, with supervised > > subprocess execution coming [2]. > > - CeleryExecutor is fully supported and tested and will also launch in > > 3.2. Callbacks are sent to Celery workers as workloads, executed via > > supervise_callback. > > > > Those implementations serve as reference examples and should give a pretty > > good idea how to handle the remaining work. > > > > > > ****************************** > > WHERE YOU CAN HELP > > ****************************** > > > > > > **************************** > > FIVE SMALLISH TASKS > > **************************** > > > > Each of the five remaining executors needs: > > 1. Set supports_callbacks = True on the executor class. > > > > 2. Handle ExecuteCallback workloads in _process_workloads(). Right now > > these executors either raise RuntimeError or NotImplementedError when they > > see a non-Task workload. We need to implement sending the callback for > > execution using supervise_callback() from > > airflow.sdk.execution_time.callback_supervisor. > > > > 3. Add tests covering the callback execution path. > > > > The specific executors that need work: > > > > - Kubernetes Executor (CNCF provider): Needs a way to run > > supervise_callback in a pod. May need a lightweight callback pod template > > or reuse the existing worker pod mechanism. I don't know enough about K8S > > to really call how complicated this will be and I can really use someone > > with K8S experience to take this one on. I can definitely help understand > > the existing code, but I won't be much help on the actual change > > implementation. > > > > - ECS Executor (Amazon provider): Similar pattern, needs to send the > > callback to an ECS task > > > > - Batch Executor (Amazon provider): Send the callback to a Batch job. > > > > - Lambda Executor (Amazon provider): Lambda is a simpler execution > > model, but callback import/execution in the Lambda runtime may be a wrinkle. > > > > - EdgeExecutor (Edge3 provider): Send the callback to an Edge worker. > > Like K8S, this is another I don't have any real experience with so I can > > help with onboarding but may not be as much use on the actual > > implementation. > > > > Note: The hybrid executors (CeleryKubernetesExecutor, > > LocalKubernetesExecutor) are deprecated and do not need to be updated. > > > > > > ************************* > > ONE LARGER TASK > > ************************* > > > > In addition to those smaller tasks, there is one bigger one on my "Some > > Day..." list which is tangentially related. One side-goal of the > > ExecutorCallbacks infrastructure is to unify the other callbacks onto the > > same framework. This isn't related to DeadlineAlerts and I still have a > > pile of work to do on that feature, so perhaps I can entice someone to take > > on the work for that? > > > > Airflow's DAG-level and task-level callbacks (on_success_callback, > > on_failure_callback, etc) currently flow ((current.... flow? I'll see > > myself out...)) through a completely separate path. They are packaged as > > DagCallbackRequest/TaskCallbackRequest and executed in the Dag Processor, > > which has several downsides. Migrating them to the new (synchronous) > > ExecutorCallback and/or AsyncCallback framework will let us run them in the > > worker or triggerer and let us give them the same level of isolation and > > support as a task which the Deadline callbacks now get. > > > > The groundwork is already laid with the new ExecutorCallback framework and > > the CallbackFetchMethod enum already has a DAG_ATTRIBUTE variant stubbed > > out for exactly this purpose. The migration would involve: > > > > 1. When the scheduler/DagRun detects a callback is needed (e.g., DAG > > success/failure, task retry/failure), instead of creating a > > DagCallbackRequest and sending it to the Dag Processor, create a Callback > > model record and let the scheduler queue it as an ExecuteCallback workload. > > > > 2. Extend the CallbackSubprocess (or supervise_callback maybe??) to > > support the DAG_ATTRIBUTE fetch method and resolve the callback from the > > DAG object's attributes (e.g., dag.on_failure_callback) rather than from an > > import path. > > > > 3. Migrate the callback execution out of dag_processing/processor.py > > (the _execute_callbacks and related code paths) and into the executor path. > > > > 4. Update or deprecate the old > > DagCallbackRequest/TaskCallbackRequest/CallbackSink infrastructure once the > > new path is proven. > > > > This is a larger piece of work that I haven't planned too deeply, so it's > > maybe not a "good first task" kind of job, but it would make all callback > > execution consistent, observable, and logged regardless of which executor > > you use and it would remove callback execution load from the Dag Processor. > > If this interests you, I'd love to collaborate on the design and help > > however I can. > > > > > > ***************************** > > HOW TO GET STARTED > > ***************************** > > > > 1. Look at the LocalExecutor implementation in > > airflow-core/src/airflow/executors/local_executor.py, specifically the > > _execute_workload function and the ExecuteCallback branch. This is the > > simplest reference. > > > > 2. Look at the CeleryExecutor implementation in providers/celery/ for an > > example of a remote/distributed executor handling callbacks. > > > > 3. The key function you'll call is supervise_callback() from > > airflow.sdk.execution_time.callback_supervisor. It takes a callback id, > > callback_path, callback_kwargs, and optional log_path. It forks a > > supervised subprocess, and returns an exit code. > > > > 4. For remote executors (K8s, ECS, Batch, Lambda), the challenge is > > making sure supervise_callback runs on the remote worker side, not in the > > scheduler. The pattern will be similar to how ExecuteTask workloads are > > handled, but will be slightly different for each executor. > > > > > > *************************** > > HOW TO VOLUNTEER > > *************************** > > > > Reply to this thread (or the corresponding GitHub issue) with which > > executor or work item you'd like to take on. One executor per volunteer is > > ideal so we can parallelize the work. I'm happy to review PRs and answer > > questions. I'll be working on related features (Task-level Deadlines, > > Asset-based and Event-based Deadlines) in parallel. > > > > Even if you're not an executor expert, this feels like a pretty > > well-scoped contribution with clear examples to follow, an active guide > > (that's me!), and it shouldn't be terribly complicated. It would be > > wonderful if they can all be done and ready for the next wave of provider > > package releases so the rest of the executors can follow quickly behind the > > first two, if not at the same time. > > > > Thanks! > > > > [1] https://github.com/apache/airflow/pull/61153 > > [2] https://github.com/apache/airflow/pull/62645 > > >
