ferruzzi opened a new issue, #62887:
URL: https://github.com/apache/airflow/issues/62887

   ### Body
   
   This is a tracking ticket for the email thread 
[here](https://lists.apache.org/thread/ssb54kvffc4q95h2ch9j95v10kokmwv5) 
   
   
   ## Work Remaining:
   
   | Executor | Assignee | Status | PR Link |
   |----------|----------|--------|---------|
   | Batch    |     Sebastian Daum @dondaum      |   CLAIMED     |         |
   | ECS      |    Shiva, Rastogi @shivaam       |   CLAIMED     |         |
   | Edge     |   Jeongwoo Do @wjddn279       |    CLAIMED    |         |
   | K8S      |    Kevin Yang     |     CLAIMED   |         |
   | Lambda   |    Sameer Mesiah @SameerMesiah97       |  CLAIMED      |        
 |
   
   
   - [ ] Convert existing Dag Processor callbacks
   
   
   A note regarding "claimed" work.  This got a lot more attention than I 
expected, I'm going to put the names down for claimed executors in the order 
the emails came in.  Once I see a PR (please tag me) I'll move it from 
"claimed" to "work in progress" (WIP).  In what I believe to be the spirit of 
the new assignee policy, if I don't see any progress after a week I'm going to 
open it back up for someone else.   I'm not expecting completion in a week, but 
I do need to see that you are actively working on it even if it is just asking 
me questions on Slack.
   
   State flow:  AVAILABLE -> CLAIMED -> WIP -> PR -> DONE
   
   ___
   
   Below the line is just copypasta of the email, for posterity
   ___
   
   Airflow 3.2 introduces ExecutorCallbacks, allowing executors to run 
synchronous callbacks (starting with Deadline Alerts). LocalExecutor and 
CeleryExecutor support is done and tested and I'm looking for volunteers to 
help implement the remaining executors.  There's also an opportunity to help 
migrate Airflow's existing DAG/task callbacks onto this same infrastructure.
   
   ## WHAT ARE EXECUTOR CALLBACKS?
   
   Deadline Alerts allow users to set a deadline on a DAG: "if this DAG isn't 
done by this time, run this callback."  If the callback is asynchronous code, 
it goes to the Triggerer to execute.  If it's synchronous code then it now (as 
of PR #[61153](https://github.com/apache/airflow/pull/61153)) gets sent to the 
executor and run in the worker, just like a Dag task.  The scheduler queues 
them, the executor picks them up, and a supervised subprocess imports and runs 
the callback function (that last part about the supervisor is [still in 
PR](https://github.com/apache/airflow/pull/62645)).
   
   ## WHAT'S DONE
   
     - Core framework is all done in PR 
#[61153](https://github.com/apache/airflow/pull/61153).  There is now a 
ExecutorWorkload which encompasses ExecuteTask and ExecuteCallback. Some work 
is still in progress, but Callbacks have almost all the same features and 
"rights" as an executor Task.
     - LocalExecutor is fully supported and tested, it'll launch in 3.2.  
Callbacks are sent to worker processes alongside tasks, with supervised 
subprocess execution [coming](https://github.com/apache/airflow/pull/62645)].
     - CeleryExecutor is fully supported and tested and will also launch in 
3.2.  Callbacks are sent to Celery workers as workloads, executed via 
supervise_callback.
   
   Those implementations serve as reference examples and should give a pretty 
good idea how to handle the remaining work.
   
   #  WHERE YOU CAN HELP
   
   ## FIVE SMALLISH TASKS
   
   Each of the five remaining executors needs:
     1. Set supports_callbacks = True on the executor class.
   
     2. Handle ExecuteCallback workloads in _process_workloads().  Right now 
these executors either raise RuntimeError or NotImplementedError when they see 
a non-Task workload. We need to implement sending the callback for execution 
using supervise_callback() from airflow.sdk.execution_time.callback_supervisor.
   
     3. Add tests covering the callback execution path.
   
   The specific executors that need work:
   
     - Kubernetes Executor (CNCF provider): Needs a way to run 
supervise_callback in a pod. May need a lightweight callback pod template or 
reuse the existing worker pod mechanism.  I don't know enough about K8S to 
really call how complicated this will be and I can really use someone with K8S 
experience to take this one on.  I can definitely help understand the existing 
code, but I won't be much help on the actual change implementation.
   
     - ECS Executor (Amazon provider): Similar pattern, needs to send the 
callback to an ECS task
   
     - Batch Executor (Amazon provider): Send the callback to a Batch job.
   
     - Lambda Executor (Amazon provider): Lambda is a simpler execution model, 
but callback import/execution in the Lambda runtime may be a wrinkle.
   
     - EdgeExecutor (Edge3 provider): Send the callback to an Edge worker.  
Like K8S, this is another I don't have any real experience with so I can help 
with onboarding but may not be as much use on the actual implementation.
   
   Note: The hybrid executors (CeleryKubernetesExecutor, 
LocalKubernetesExecutor) are deprecated and do not need to be updated.
   
   ## ONE LARGER TASK
   
   In addition to those smaller tasks, there is one bigger one on my "Some 
Day..." list which is tangentially related.  One side-goal of the 
ExecutorCallbacks infrastructure is to unify the other callbacks onto the same 
framework.  This isn't related to DeadlineAlerts and I still have a pile of 
work to do on that feature, so perhaps I can entice someone to take on the work 
for that?
   
   Airflow's DAG-level and task-level callbacks (on_success_callback, 
on_failure_callback, etc) currently flow ((current.... flow?  I'll see myself 
out...)) through a completely separate path.  They are packaged as 
DagCallbackRequest/TaskCallbackRequest and executed in the Dag Processor, which 
has several downsides.  Migrating them to the new (synchronous) 
ExecutorCallback and/or AsyncCallback framework will let us run them in the 
worker or triggerer and let us give them the same level of isolation and 
support as a task which the Deadline callbacks now get.
   
   The groundwork is already laid with the new ExecutorCallback framework and 
the CallbackFetchMethod enum already has a DAG_ATTRIBUTE variant stubbed out 
for exactly this purpose. The migration would involve:
   
     1. When the scheduler/DagRun detects a callback is needed (e.g., DAG 
success/failure, task retry/failure), instead of creating a DagCallbackRequest 
and sending it to the Dag Processor, create a Callback model record and let the 
scheduler queue it as an ExecuteCallback workload.
   
     2. Extend the CallbackSubprocess (or supervise_callback maybe??) to 
support the DAG_ATTRIBUTE fetch method and resolve the callback from the DAG 
object's attributes (e.g., dag.on_failure_callback) rather than from an import 
path.
   
     3. Migrate the callback execution out of dag_processing/processor.py (the 
_execute_callbacks and related code paths) and into the executor path.
   
     4. Update or deprecate the old 
DagCallbackRequest/TaskCallbackRequest/CallbackSink infrastructure once the new 
path is proven.
   
   This is a larger piece of work that I haven't planned too deeply, so it's 
maybe not a "good first task" kind of job, but it would make all callback 
execution consistent, observable, and logged regardless of which executor you 
use and it would remove callback execution load from the Dag Processor. If this 
interests you, I'd love to collaborate on the design and help however I can.
   
   # HOW TO GET STARTED
   
     1. Look at the LocalExecutor implementation in 
airflow-core/src/airflow/executors/local_executor.py, specifically the 
_execute_workload function and the ExecuteCallback branch. This is the simplest 
reference.
   
     2. Look at the CeleryExecutor implementation in providers/celery/ for an 
example of a remote/distributed executor handling callbacks.
   
     3. The key function you'll call is supervise_callback() from 
airflow.sdk.execution_time.callback_supervisor.  It takes a callback id, 
callback_path, callback_kwargs, and optional log_path.  It forks a supervised 
subprocess, and returns an exit code.
   
     4. For remote executors (K8s, ECS, Batch, Lambda), the challenge is making 
sure supervise_callback runs on the remote worker side, not in the scheduler. 
The pattern will be similar to how ExecuteTask workloads are handled, but will 
be slightly different for each executor.
   
   ## HOW TO VOLUNTEER
   
   Reply to this thread (or the corresponding GitHub issue) with which executor 
or work item you'd like to take on. One executor per volunteer is ideal so we 
can parallelize the work. I'm happy to review PRs and answer questions.  I'll 
be working on related features (Task-level Deadlines, Asset-based and 
Event-based Deadlines) in parallel.
   
   Even if you're not an executor expert, this feels like a pretty well-scoped 
contribution with clear examples to follow, an active guide (that's me!), and 
it shouldn't be terribly complicated.  It would be wonderful if they can all be 
done and ready for the next wave of provider package releases so the rest of 
the executors can follow quickly behind the first two, if not at the same time.
   
   Thanks!
   
   
   ### Committer
   
   - [x] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to