Yes, that's exactly the idea. It would work as described. Furthermore—and
I've looked into this—we can internally wrap the callable as a SyncCallback
or AsyncCallback. This would also support the current way for defining and
passing callbacks in a DAG or task. Of course, only if you enable this
feature.

Ferruzzi, Dennis <[email protected]> schrieb am Fr., 20. März 2026, 20:27:

> As mentioned, I helped with the general idea and think it's the right
> direction; it's something I saw as the logical next step once the Deadline
> callbacks were sorted out.  In the new implementation, the user would
> define the on_*_callback as either a SyncCallback or an AsyncCallback, and
> by that decision they determine where the callback runs, just like a
> deadline callback right?
>
>
> with DAG(
>     dag_id="example_callback",
>     on_success_callback=SyncCallback(dag_success_alert),
>     default_args={"on_execute_callback":
> AsyncCallback(task_execute_callback)},
> ):
>     task1 = EmptyOperator(task_id="task1",
> on_failure_callback=[SyncCallback(task_failure_alert)])
>
>
>
> > the greater good.
> The greater goood
> ________________________________
> From: Jens Scheffler <[email protected]>
> Sent: Friday, March 20, 2026 11:53 AM
> To: [email protected] <[email protected]>
> Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be
> moved to Worker or Triggerer?
>
> CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe.
> Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez
> pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que
> le contenu ne présente aucun risque.
>
>
>
> Hi Sebastian,
>
> thanks for kicking off the discussion. I am 100% for this. I ever
> wondered why it was implemented differently.
>
> I just fear a bit of complexity will come, but it will be for the
> greater good.
>
> Jens
>
> On 20.03.26 14:40, Sebastian Daum wrote:
> > Hello community,
> >
> > I'd like to discuss moving Dag-level and task-level callbacks from the
> Dag
> > Processor to either the Worker or Triggerer.
> >
> >
> > Background
> >
> > Airflow's new ExecutorCallback framework, introduced with Deadline Alerts
> > [1], provides a flexible approach for running callbacks. Deadline
> Callbacks
> > can now run within the Triggerer (for async code) or on a Worker as
> > scheduled callbacks (for synchronous code).
> >
> > This new Callback Framework opens up the possibility of moving all Dag
> > callbacks and task callbacks from the Dag Processor to the Worker and
> > Triggerer. This change would give callbacks the same level of isolation
> and
> > support as standard task workloads while freeing the Dag Processor from
> > callback execution responsibilities.
> >
> > This topic has appeared in previous devlist discussions, and there's an
> > open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag
> Processor
> > separation) explored running task callbacks in a Worker [3], though it
> also
> > highlighted potential downsides, particularly for the KubernetesExecutor
> > where spinning up a new pod for callback execution creates overhead
> > compared to using the existing Dag Processor.
> >
> >
> > Current Implementation
> >
> > Currently, both the Scheduler and Triggerer create CallbackRequests
> > (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them
> to
> > the database using DbCallbackRequest/DagProcessorCallback.
> >
> > The DagFileProcessorManager fetches stored DbCallbackRequests,
> deserializes
> > them, and sends them to the file_queue along with the callback request. A
> > DagFileProcessorProcess then picks them up and executes them.
> >
> >
> > Proposed Changes
> >
> > Here are the key steps needed to implement this change:
> >
> > 1. Add new Airflow configuration: Add a new configuration option (e.g.,
> > use_worker_callbacks or similar) that allows users to opt into the new
> > callback execution behavior while maintaining the existing Dag Processor
> > approach as the default.
> >
> > 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in
> > SyncCallable or AsyncCallable if not already provided with the new type.
> > This maintains backwards compatibility and preserves the current Dag
> > authoring experience.
> >
> > 3. Serialization Updates: Adjust Dag serialization to include Dag/Task
> > on_*_callback references. Like Deadline Callbacks, we would serialize
> only
> > the reference to the callable, not the callable itself.
> >
> > 4. Callback Consolidation: Combine the different DagProcessorCallback
> > implementations with ExecutorCallbacks and TriggererCallbacks.
> >
> > 5. Improved Typing: Enhance typing and class separation within the
> Callback
> > data structure. Currently, we differentiate between CallbackType:
> > Triggerer, Executor, and DAG Processor. It makes sense to implement
> better
> > type discrimination for DagCallback, TaskCallback, and potentially more
> > specific types like DagSuccessCallback, DagErrorCallback,
> > TaskSuccessCallback, EmailCallback, etc. This might warrant adding a new
> > table column and reconsidering the CallbackType field structure.
> >
> > 6. Component Updates: Modify the Scheduler and Triggerer to send
> > ExecutorCallbacks or TriggererCallbacks and store them in the database
> > instead of sending CallbackRequest via DatabaseSink.
> >
> > 7. Integration: Leverage the existing logic for running Deadline
> Callbacks.
> > However, if we decide to run callbacks on workers, we'll need to
> determine
> > how to handle and prioritize callbacks versus standard task workloads, as
> > well as different callback types (Deadline Alerts, Task Level Callbacks
> vs.
> > Dag Level Callbacks) in the scheduler. This connects to ongoing
> discussions
> > about 'Deadline Callback queuing and priority' [4].
> >
> > 8. Deprecation: Deprecate the existing
> > DagCallbackRequest/TaskCallbackRequest/CallbackSink process.
> >
> > I'd appreciate your thoughts on this proposal, particularly around:
> >
> > - Any concerns about the migration path
> >
> > - The prioritization strategy for callbacks vs. standard tasks
> >
> >
> > Thanks for your consideration!
> >
> > Sebastian
> >
> >
> > P.S. Many thanks to ferruzzi and shivaam for the discussions and
> feedback.
> >
> >
> > --------------------------------------------------------
> >
> > [1] ExecutorCallback Framework
> >
> > Executor Synchronous callback workload PR #61153
> > <https://github.com/apache/airflow/pull/61153>
> >
> > [2] Move dag-level callbacks to worker
> >
> > Issue #44354 <https://github.com/apache/airflow/issues/44354>
> >
> > [3] Dag Processor separation
> >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
> >
> > [4] AIP-86 Deadline Callback queuing and priority
> >
> > https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to