GitHub user hnomichith added a comment to the discussion: Add the ability to backfill a DAG based on past Asset Events
I think @peter use case are solved by using task clearing feature, right? For @ldacey use case, your only message makes me think your use case is solved as well, right? I'm not sure I get how related it was with the previously mentioned usecases. About the initial need to process past asset events, I request a account to create an AIP. As suggested in the AIP instructions, I first wrote an email. If you are interested, here is my draft. I was a bit lost because AIP instructions about major changes is implicitly excluding changes on Airflow 3 REST APIs, by mentioning only Airflow 2 REST APIs. But I guess it's still worth to see if they have more insights about the potential consequences, or if it would not work. ```markdown # Modify Airflow 3 REST endpoints in order to queue past Asset events for a DAG ## Motivation Let's consider three DAGs: `DAG_1`, `DAG_2` and `DAG_3`. `DAG_1` is creating some data, at non regular frequency, then those data must be processed by `DAG_2` and `DAG_3`. This use case seems addressed by Airflow Assets: `DAG_1` updates an Airflow asset, and this update triggers `DAG_2` and `DAG_3`. The problem: `DAG_2` or `DAG_3` can be created later than `DAG_1`, but they still need to process the past data updates. To properly address this use case, we need a way to trigger `DAG_2` or `DAG_3` based on past Asset events. Additionally, `DAG_2` or `DAG_3` should be able to be triggered independently, since nothing should conceptually tie them together. ## Considerations ### What change do you propose to make? I propose to modify Airflow 3 REST endpoints. #### Add `POST /api/v2/dags/:dag_id/assets/queuedEvents` This endpoint would take as input a list of existing Asset Event IDs. It would then queue the corresponding Asset Events for the DAG, as if it had just happened. As a consequence, from what I understand from [the scheduler code](https://github.com/apache/airflow/blob/c8aa74a19e39be75800ebd13289bf0713b9718b4/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L1728), it will create DAG runs (if the DAG is reacting on this Asset event only). #### Add a `name` filter to `GET /api/v2/assets/events` In order to programmatically find the relevant Asset Event IDs, I would need the ability to filter Asset Events by name. ### Why is it needed? Thanks to those two endpoints, I could create a script that would: 1. Get the list of past Asset Events for a given Asset, using `GET /api/v2/assets/events?name=...` (with the timestamps). 2. For each of those Asset Events, call `POST /api/v2/dags/:dag_id/assets/queuedEvents` to queue them for the DAG. ### Are there any downsides to this change? >From my understanding, those will create more features to maintain, and add >complexity to the API. This idea has been originally discussed in [Airflow Github](https://github.com/apache/airflow/discussions/59886), where I've been advised to create an AIP despite it might not qualify as a ["major change that needs an AIP"](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89066602#AirflowImprovementProposals-WhatisconsideredamajorchangethatneedsanAIP?). Here is the rationale: > Possibly, you could use QueuedEvents, but this is not a question of "tweaking > an API" - but designing and implementing huge new feature, considering data > usage, consequences, storage mechanisms and persistency of the data as well > as performance impact of such re-valuations. Therefore, I'm counting on this AIP to identify the potential consequences of this change, and to discuss the best way to implement it. ### Which users are affected by the change? It shouldn't affect anyone negatively, since these are non-breaking changes to the API. It would just allow users to trigger DAG runs based on past Asset events, which is currently not possible. ### How are users affected by the change? (e.g. DB upgrade required?) Not affected. ### What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3) None. ### Other considerations? There's currently a possible workaround for this use case. Still using a script and the Airflow 3 REST API, users can: 1. Pause all DAGs but the one they want to trigger (e.g. we want to trigger `DAG_2`, we pause `DAG_1` and `DAG_3`). 2. Get the list of past Asset Events, using the `source_dag_id` filter to only get the events related to `DAG_1`. 3. For each of those Asset Events, call `POST /api/v2/assets/events` to create a synthetic Asset Event with the same content as the past one. 4. Unpause the DAGs that have been previously paused. This will trigger `DAG_2` for each of those past events, without re-triggering `DAG_3`. However, I think it's a hack of the Asset Events concept, since it creates Asset Events that don't correspond to real data updates. Moreover, it has drawbacks: - It does not work with cross-deployment DAGs. - Triggering it twice will create duplicate events that must be handled separately. ### What defines this AIP as "done"? The two endpoints are implemented and documented, reaching Airflow usual quality standards. ``` GitHub link: https://github.com/apache/airflow/discussions/59886#discussioncomment-15913495 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
