Are there any further thoughts on this topic? Ferruzzi, Dennis <[email protected]> schrieb am Fr., 20. März 2026, 23:33:
> Cool, just checking. Yeah, I like the idea and would also like to make > sure when we say we're putting a config option in there, that there is some > clearly documented deprecation path so that the new behavior will > eventually be the default. Naming is hard, but I wonder if a better config > flag might be a bool named `use_legacy_callbacks`? But either way, that's > an implementation detail we don't necessarily need to hash out here. > > - ferruzzi > ________________________________ > From: Sebastian Daum <[email protected]> > Sent: Friday, March 20, 2026 2:07 PM > To: [email protected] <[email protected]> > Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be > moved to Worker or Triggerer? > > CAUTION: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. > Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne pouvez > pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain que > le contenu ne présente aucun risque. > > > > Yes, that's exactly the idea. It would work as described. Furthermore—and > I've looked into this—we can internally wrap the callable as a SyncCallback > or AsyncCallback. This would also support the current way for defining and > passing callbacks in a DAG or task. Of course, only if you enable this > feature. > > Ferruzzi, Dennis <[email protected]> schrieb am Fr., 20. März 2026, > 20:27: > > > As mentioned, I helped with the general idea and think it's the right > > direction; it's something I saw as the logical next step once the > Deadline > > callbacks were sorted out. In the new implementation, the user would > > define the on_*_callback as either a SyncCallback or an AsyncCallback, > and > > by that decision they determine where the callback runs, just like a > > deadline callback right? > > > > > > with DAG( > > dag_id="example_callback", > > on_success_callback=SyncCallback(dag_success_alert), > > default_args={"on_execute_callback": > > AsyncCallback(task_execute_callback)}, > > ): > > task1 = EmptyOperator(task_id="task1", > > on_failure_callback=[SyncCallback(task_failure_alert)]) > > > > > > > > > the greater good. > > The greater goood > > ________________________________ > > From: Jens Scheffler <[email protected]> > > Sent: Friday, March 20, 2026 11:53 AM > > To: [email protected] <[email protected]> > > Subject: RE: [EXT] [DISCUSS] Should Dag-level and task-level callbacks be > > moved to Worker or Triggerer? > > > > CAUTION: This email originated from outside of the organization. Do not > > click links or open attachments unless you can confirm the sender and > know > > the content is safe. > > > > > > > > AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. > > Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne > pouvez > > pas confirmer l’identité de l’expéditeur et si vous n’êtes pas certain > que > > le contenu ne présente aucun risque. > > > > > > > > Hi Sebastian, > > > > thanks for kicking off the discussion. I am 100% for this. I ever > > wondered why it was implemented differently. > > > > I just fear a bit of complexity will come, but it will be for the > > greater good. > > > > Jens > > > > On 20.03.26 14:40, Sebastian Daum wrote: > > > Hello community, > > > > > > I'd like to discuss moving Dag-level and task-level callbacks from the > > Dag > > > Processor to either the Worker or Triggerer. > > > > > > > > > Background > > > > > > Airflow's new ExecutorCallback framework, introduced with Deadline > Alerts > > > [1], provides a flexible approach for running callbacks. Deadline > > Callbacks > > > can now run within the Triggerer (for async code) or on a Worker as > > > scheduled callbacks (for synchronous code). > > > > > > This new Callback Framework opens up the possibility of moving all Dag > > > callbacks and task callbacks from the Dag Processor to the Worker and > > > Triggerer. This change would give callbacks the same level of isolation > > and > > > support as standard task workloads while freeing the Dag Processor from > > > callback execution responsibilities. > > > > > > This topic has appeared in previous devlist discussions, and there's an > > > open issue covering at least Dag-level callbacks [2]. AIP-43 (Dag > > Processor > > > separation) explored running task callbacks in a Worker [3], though it > > also > > > highlighted potential downsides, particularly for the > KubernetesExecutor > > > where spinning up a new pod for callback execution creates overhead > > > compared to using the existing Dag Processor. > > > > > > > > > Current Implementation > > > > > > Currently, both the Scheduler and Triggerer create CallbackRequests > > > (DagCallbackRequest | TaskCallbackRequest | EmailRequest) and send them > > to > > > the database using DbCallbackRequest/DagProcessorCallback. > > > > > > The DagFileProcessorManager fetches stored DbCallbackRequests, > > deserializes > > > them, and sends them to the file_queue along with the callback > request. A > > > DagFileProcessorProcess then picks them up and executes them. > > > > > > > > > Proposed Changes > > > > > > Here are the key steps needed to implement this change: > > > > > > 1. Add new Airflow configuration: Add a new configuration option (e.g., > > > use_worker_callbacks or similar) that allows users to opt into the new > > > callback execution behavior while maintaining the existing Dag > Processor > > > approach as the default. > > > > > > 2. Callback Wrapping (Task-SDK): Wrap Dag and Task Callables in > > > SyncCallable or AsyncCallable if not already provided with the new > type. > > > This maintains backwards compatibility and preserves the current Dag > > > authoring experience. > > > > > > 3. Serialization Updates: Adjust Dag serialization to include Dag/Task > > > on_*_callback references. Like Deadline Callbacks, we would serialize > > only > > > the reference to the callable, not the callable itself. > > > > > > 4. Callback Consolidation: Combine the different DagProcessorCallback > > > implementations with ExecutorCallbacks and TriggererCallbacks. > > > > > > 5. Improved Typing: Enhance typing and class separation within the > > Callback > > > data structure. Currently, we differentiate between CallbackType: > > > Triggerer, Executor, and DAG Processor. It makes sense to implement > > better > > > type discrimination for DagCallback, TaskCallback, and potentially more > > > specific types like DagSuccessCallback, DagErrorCallback, > > > TaskSuccessCallback, EmailCallback, etc. This might warrant adding a > new > > > table column and reconsidering the CallbackType field structure. > > > > > > 6. Component Updates: Modify the Scheduler and Triggerer to send > > > ExecutorCallbacks or TriggererCallbacks and store them in the database > > > instead of sending CallbackRequest via DatabaseSink. > > > > > > 7. Integration: Leverage the existing logic for running Deadline > > Callbacks. > > > However, if we decide to run callbacks on workers, we'll need to > > determine > > > how to handle and prioritize callbacks versus standard task workloads, > as > > > well as different callback types (Deadline Alerts, Task Level Callbacks > > vs. > > > Dag Level Callbacks) in the scheduler. This connects to ongoing > > discussions > > > about 'Deadline Callback queuing and priority' [4]. > > > > > > 8. Deprecation: Deprecate the existing > > > DagCallbackRequest/TaskCallbackRequest/CallbackSink process. > > > > > > I'd appreciate your thoughts on this proposal, particularly around: > > > > > > - Any concerns about the migration path > > > > > > - The prioritization strategy for callbacks vs. standard tasks > > > > > > > > > Thanks for your consideration! > > > > > > Sebastian > > > > > > > > > P.S. Many thanks to ferruzzi and shivaam for the discussions and > > feedback. > > > > > > > > > -------------------------------------------------------- > > > > > > [1] ExecutorCallback Framework > > > > > > Executor Synchronous callback workload PR #61153 > > > <https://github.com/apache/airflow/pull/61153> > > > > > > [2] Move dag-level callbacks to worker > > > > > > Issue #44354 <https://github.com/apache/airflow/issues/44354> > > > > > > [3] Dag Processor separation > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation > > > > > > [4] AIP-86 Deadline Callback queuing and priority > > > > > > https://lists.apache.org/thread/85zydrb5sc61gmgktm991jmjqvb78x7w > > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: [email protected] > > For additional commands, e-mail: [email protected] > > > > >
