Hi Sebastian,
thanks for kicking off the discussion. I am 100% for this. I ever
wondered why it was implemented differently.
I just fear a bit of complexity will come, but it will be for the
greater good.
Jens
On 20.03.26 14:40, Sebastian Daum wrote:
Hello community,
I'd like to discuss moving Dag-level and task-level callbacks from the Dag
Processor to either the Worker or Triggerer.
Background
Airflow's new ExecutorCallback framework, introduced with Deadline Alerts
[1], provides a flexible approach for running callbacks. Deadline Callbacks
can now run within the Triggerer (for async code) or on a Worker as
scheduled callbacks (for synchronous code).
This new Callback Framework opens up the possibility of moving all Dag
callbacks and task callbacks from the Dag Processor to the Worker and
Triggerer. This change would give callbacks the same level of isolation and
support as standard task workloads while freeing the Dag Processor from
callback execution responsibilities.
This topic has appeared in previous devlist discussions, and there's an
open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag Processor
separation) explored running task callbacks in a Worker [3], though it also
highlighted potential downsides, particularly for the KubernetesExecutor
where spinning up a new pod for callback execution creates overhead
compared to using the existing Dag Processor.
Current Implementation
Currently, both the Scheduler and Triggerer create CallbackRequests
(DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them to
the database using DbCallbackRequest/DagProcessorCallback.
The DagFileProcessorManager fetches stored DbCallbackRequests, deserializes
them, and sends them to the file_queue along with the callback request. A
DagFileProcessorProcess then picks them up and executes them.
Proposed Changes
Here are the key steps needed to implement this change:
1. Add new Airflow configuration: Add a new configuration option (e.g.,
use_worker_callbacks or similar) that allows users to opt into the new
callback execution behavior while maintaining the existing Dag Processor
approach as the default.
2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in
SyncCallable or AsyncCallable if not already provided with the new type.
This maintains backwards compatibility and preserves the current Dag
authoring experience.
3. Serialization Updates: Adjust Dag serialization to include Dag/Task
on_*_callback references. Like Deadline Callbacks, we would serialize only
the reference to the callable, not the callable itself.
4. Callback Consolidation: Combine the different DagProcessorCallback
implementations with ExecutorCallbacks and TriggererCallbacks.
5. Improved Typing: Enhance typing and class separation within the Callback
data structure. Currently, we differentiate between CallbackType:
Triggerer, Executor, and DAG Processor. It makes sense to implement better
type discrimination for DagCallback, TaskCallback, and potentially more
specific types like DagSuccessCallback, DagErrorCallback,
TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new
table column and reconsidering the CallbackType field structure.
6. Component Updates: Modify the Scheduler and Triggerer to send
ExecutorCallbacks or TriggererCallbacks and store them in the database
instead of sending CallbackRequest via DatabaseSink.
7. Integration: Leverage the existing logic for running Deadline Callbacks.
However, if we decide to run callbacks on workers, we'll need to determine
how to handle and prioritize callbacks versus standard task workloads, as
well as different callback types (Deadline Alerts, Task Level Callbacks vs.
Dag Level Callbacks) in the scheduler. This connects to ongoing discussions
about 'Deadline Callback queuing and priority' [4].
8. Deprecation: Deprecate the existing
DagCallbackRequest/TaskCallbackRequest/CallbackSink process.
I'd appreciate your thoughts on this proposal, particularly around:
- Any concerns about the migration path
- The prioritization strategy for callbacks vs. standard tasks
Thanks for your consideration!
Sebastian
P.S. Many thanks to ferruzzi and shivaam for the discussions and feedback.
--------------------------------------------------------
[1] ExecutorCallback Framework
Executor Synchronous callback workload PR #61153
<https://github.com/apache/airflow/pull/61153>
[2] Move dag-level callbacks to worker
Issue #44354 <https://github.com/apache/airflow/issues/44354>
[3] Dag Processor separation
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
[4] AIP-86 Deadline Callback queuing and priority
https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]