As mentioned, I helped with the general idea and think it's the right 
direction; it's something I saw as the logical next step once the Deadline 
callbacks were sorted out.  In the new implementation, the user would define 
the on_*_callback as either a SyncCallback or an AsyncCallback, and by that 
decision they determine where the callback runs, just like a deadline callback 
right?


with DAG(
    dag_id="example_callback",
    on_success_callback=SyncCallback(dag_success_alert),
    default_args={"on_execute_callback": AsyncCallback(task_execute_callback)},
):
    task1 = EmptyOperator(task_id="task1", 
on_failure_callback=[SyncCallback(task_failure_alert)])



> the greater good.
The greater goood
________________________________
From: Jens Scheffler <[email protected]>
Sent: Friday, March 20, 2026 11:53 AM
To: [email protected] <[email protected]>
Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be moved 
to Worker or Triggerer?

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. Ne 
cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez pas 
confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que le 
contenu ne présente aucun risque.



Hi Sebastian,

thanks for kicking off the discussion. I am 100% for this. I ever
wondered why it was implemented differently.

I just fear a bit of complexity will come, but it will be for the
greater good.

Jens

On 20.03.26 14:40, Sebastian Daum wrote:
> Hello community,
>
> I'd like to discuss moving Dag-level and task-level callbacks from the Dag
> Processor to either the Worker or Triggerer.
>
>
> Background
>
> Airflow's new ExecutorCallback framework, introduced with Deadline Alerts
> [1], provides a flexible approach for running callbacks. Deadline Callbacks
> can now run within the Triggerer (for async code) or on a Worker as
> scheduled callbacks (for synchronous code).
>
> This new Callback Framework opens up the possibility of moving all Dag
> callbacks and task callbacks from the Dag Processor to the Worker and
> Triggerer. This change would give callbacks the same level of isolation and
> support as standard task workloads while freeing the Dag Processor from
> callback execution responsibilities.
>
> This topic has appeared in previous devlist discussions, and there's an
> open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag Processor
> separation) explored running task callbacks in a Worker [3], though it also
> highlighted potential downsides, particularly for the KubernetesExecutor
> where spinning up a new pod for callback execution creates overhead
> compared to using the existing Dag Processor.
>
>
> Current Implementation
>
> Currently, both the Scheduler and Triggerer create CallbackRequests
> (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them to
> the database using DbCallbackRequest/DagProcessorCallback.
>
> The DagFileProcessorManager fetches stored DbCallbackRequests, deserializes
> them, and sends them to the file_queue along with the callback request. A
> DagFileProcessorProcess then picks them up and executes them.
>
>
> Proposed Changes
>
> Here are the key steps needed to implement this change:
>
> 1. Add new Airflow configuration: Add a new configuration option (e.g.,
> use_worker_callbacks or similar) that allows users to opt into the new
> callback execution behavior while maintaining the existing Dag Processor
> approach as the default.
>
> 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in
> SyncCallable or AsyncCallable if not already provided with the new type.
> This maintains backwards compatibility and preserves the current Dag
> authoring experience.
>
> 3. Serialization Updates: Adjust Dag serialization to include Dag/Task
> on_*_callback references. Like Deadline Callbacks, we would serialize only
> the reference to the callable, not the callable itself.
>
> 4. Callback Consolidation: Combine the different DagProcessorCallback
> implementations with ExecutorCallbacks and TriggererCallbacks.
>
> 5. Improved Typing: Enhance typing and class separation within the Callback
> data structure. Currently, we differentiate between CallbackType:
> Triggerer, Executor, and DAG Processor. It makes sense to implement better
> type discrimination for DagCallback, TaskCallback, and potentially more
> specific types like DagSuccessCallback, DagErrorCallback,
> TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new
> table column and reconsidering the CallbackType field structure.
>
> 6. Component Updates: Modify the Scheduler and Triggerer to send
> ExecutorCallbacks or TriggererCallbacks and store them in the database
> instead of sending CallbackRequest via DatabaseSink.
>
> 7. Integration: Leverage the existing logic for running Deadline Callbacks.
> However, if we decide to run callbacks on workers, we'll need to determine
> how to handle and prioritize callbacks versus standard task workloads, as
> well as different callback types (Deadline Alerts, Task Level Callbacks vs.
> Dag Level Callbacks) in the scheduler. This connects to ongoing discussions
> about 'Deadline Callback queuing and priority' [4].
>
> 8. Deprecation: Deprecate the existing
> DagCallbackRequest/TaskCallbackRequest/CallbackSink process.
>
> I'd appreciate your thoughts on this proposal, particularly around:
>
> - Any concerns about the migration path
>
> - The prioritization strategy for callbacks vs. standard tasks
>
>
> Thanks for your consideration!
>
> Sebastian
>
>
> P.S. Many thanks to ferruzzi and shivaam for the discussions and feedback.
>
>
> --------------------------------------------------------
>
> [1] ExecutorCallback Framework
>
> Executor Synchronous callback workload PR #61153
> <https://github.com/apache/airflow/pull/61153>
>
> [2] Move dag-level callbacks to worker
>
> Issue #44354 <https://github.com/apache/airflow/issues/44354>
>
> [3] Dag Processor separation
>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
>
> [4] AIP-86 Deadline Callback queuing and priority
>
> https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to