ferruzzi opened a new issue, #62887: URL: https://github.com/apache/airflow/issues/62887
### Body This is a tracking ticket for the email thread [here](https://lists.apache.org/thread/ssb54kvffc4q95h2ch9j95v10kokmwv5) ## Work Remaining: | Executor | Assignee | Status | PR Link | |----------|----------|--------|---------| | Batch | Sebastian Daum @dondaum | CLAIMED | | | ECS | Shiva, Rastogi @shivaam | CLAIMED | | | Edge | Jeongwoo Do @wjddn279 | CLAIMED | | | K8S | Kevin Yang | CLAIMED | | | Lambda | Sameer Mesiah @SameerMesiah97 | CLAIMED | | - [ ] Convert existing Dag Processor callbacks A note regarding "claimed" work. This got a lot more attention than I expected, I'm going to put the names down for claimed executors in the order the emails came in. Once I see a PR (please tag me) I'll move it from "claimed" to "work in progress" (WIP). In what I believe to be the spirit of the new assignee policy, if I don't see any progress after a week I'm going to open it back up for someone else. I'm not expecting completion in a week, but I do need to see that you are actively working on it even if it is just asking me questions on Slack. State flow: AVAILABLE -> CLAIMED -> WIP -> PR -> DONE ___ Below the line is just copypasta of the email, for posterity ___ Airflow 3.2 introduces ExecutorCallbacks, allowing executors to run synchronous callbacks (starting with Deadline Alerts). LocalExecutor and CeleryExecutor support is done and tested and I'm looking for volunteers to help implement the remaining executors. There's also an opportunity to help migrate Airflow's existing DAG/task callbacks onto this same infrastructure. ## WHAT ARE EXECUTOR CALLBACKS? Deadline Alerts allow users to set a deadline on a DAG: "if this DAG isn't done by this time, run this callback." If the callback is asynchronous code, it goes to the Triggerer to execute. If it's synchronous code then it now (as of PR #[61153](https://github.com/apache/airflow/pull/61153)) gets sent to the executor and run in the worker, just like a Dag task. The scheduler queues them, the executor picks them up, and a supervised subprocess imports and runs the callback function (that last part about the supervisor is [still in PR](https://github.com/apache/airflow/pull/62645)). ## WHAT'S DONE - Core framework is all done in PR #[61153](https://github.com/apache/airflow/pull/61153). There is now a ExecutorWorkload which encompasses ExecuteTask and ExecuteCallback. Some work is still in progress, but Callbacks have almost all the same features and "rights" as an executor Task. - LocalExecutor is fully supported and tested, it'll launch in 3.2. Callbacks are sent to worker processes alongside tasks, with supervised subprocess execution [coming](https://github.com/apache/airflow/pull/62645)]. - CeleryExecutor is fully supported and tested and will also launch in 3.2. Callbacks are sent to Celery workers as workloads, executed via supervise_callback. Those implementations serve as reference examples and should give a pretty good idea how to handle the remaining work. # WHERE YOU CAN HELP ## FIVE SMALLISH TASKS Each of the five remaining executors needs: 1. Set supports_callbacks = True on the executor class. 2. Handle ExecuteCallback workloads in _process_workloads(). Right now these executors either raise RuntimeError or NotImplementedError when they see a non-Task workload. We need to implement sending the callback for execution using supervise_callback() from airflow.sdk.execution_time.callback_supervisor. 3. Add tests covering the callback execution path. The specific executors that need work: - Kubernetes Executor (CNCF provider): Needs a way to run supervise_callback in a pod. May need a lightweight callback pod template or reuse the existing worker pod mechanism. I don't know enough about K8S to really call how complicated this will be and I can really use someone with K8S experience to take this one on. I can definitely help understand the existing code, but I won't be much help on the actual change implementation. - ECS Executor (Amazon provider): Similar pattern, needs to send the callback to an ECS task - Batch Executor (Amazon provider): Send the callback to a Batch job. - Lambda Executor (Amazon provider): Lambda is a simpler execution model, but callback import/execution in the Lambda runtime may be a wrinkle. - EdgeExecutor (Edge3 provider): Send the callback to an Edge worker. Like K8S, this is another I don't have any real experience with so I can help with onboarding but may not be as much use on the actual implementation. Note: The hybrid executors (CeleryKubernetesExecutor, LocalKubernetesExecutor) are deprecated and do not need to be updated. ## ONE LARGER TASK In addition to those smaller tasks, there is one bigger one on my "Some Day..." list which is tangentially related. One side-goal of the ExecutorCallbacks infrastructure is to unify the other callbacks onto the same framework. This isn't related to DeadlineAlerts and I still have a pile of work to do on that feature, so perhaps I can entice someone to take on the work for that? Airflow's DAG-level and task-level callbacks (on_success_callback, on_failure_callback, etc) currently flow ((current.... flow? I'll see myself out...)) through a completely separate path. They are packaged as DagCallbackRequest/TaskCallbackRequest and executed in the Dag Processor, which has several downsides. Migrating them to the new (synchronous) ExecutorCallback and/or AsyncCallback framework will let us run them in the worker or triggerer and let us give them the same level of isolation and support as a task which the Deadline callbacks now get. The groundwork is already laid with the new ExecutorCallback framework and the CallbackFetchMethod enum already has a DAG_ATTRIBUTE variant stubbed out for exactly this purpose. The migration would involve: 1. When the scheduler/DagRun detects a callback is needed (e.g., DAG success/failure, task retry/failure), instead of creating a DagCallbackRequest and sending it to the Dag Processor, create a Callback model record and let the scheduler queue it as an ExecuteCallback workload. 2. Extend the CallbackSubprocess (or supervise_callback maybe??) to support the DAG_ATTRIBUTE fetch method and resolve the callback from the DAG object's attributes (e.g., dag.on_failure_callback) rather than from an import path. 3. Migrate the callback execution out of dag_processing/processor.py (the _execute_callbacks and related code paths) and into the executor path. 4. Update or deprecate the old DagCallbackRequest/TaskCallbackRequest/CallbackSink infrastructure once the new path is proven. This is a larger piece of work that I haven't planned too deeply, so it's maybe not a "good first task" kind of job, but it would make all callback execution consistent, observable, and logged regardless of which executor you use and it would remove callback execution load from the Dag Processor. If this interests you, I'd love to collaborate on the design and help however I can. # HOW TO GET STARTED 1. Look at the LocalExecutor implementation in airflow-core/src/airflow/executors/local_executor.py, specifically the _execute_workload function and the ExecuteCallback branch. This is the simplest reference. 2. Look at the CeleryExecutor implementation in providers/celery/ for an example of a remote/distributed executor handling callbacks. 3. The key function you'll call is supervise_callback() from airflow.sdk.execution_time.callback_supervisor. It takes a callback id, callback_path, callback_kwargs, and optional log_path. It forks a supervised subprocess, and returns an exit code. 4. For remote executors (K8s, ECS, Batch, Lambda), the challenge is making sure supervise_callback runs on the remote worker side, not in the scheduler. The pattern will be similar to how ExecuteTask workloads are handled, but will be slightly different for each executor. ## HOW TO VOLUNTEER Reply to this thread (or the corresponding GitHub issue) with which executor or work item you'd like to take on. One executor per volunteer is ideal so we can parallelize the work. I'm happy to review PRs and answer questions. I'll be working on related features (Task-level Deadlines, Asset-based and Event-based Deadlines) in parallel. Even if you're not an executor expert, this feels like a pretty well-scoped contribution with clear examples to follow, an active guide (that's me!), and it shouldn't be terribly complicated. It would be wonderful if they can all be done and ready for the next wave of provider package releases so the rest of the executors can follow quickly behind the first two, if not at the same time. Thanks! ### Committer - [x] I acknowledge that I am a maintainer/committer of the Apache Airflow project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
