GitHub user potiuk added a comment to the discussion: Add the ability to 
backfill a DAG based on past Asset Events

> From your second message, I really think you are mis-understanding my 
> usecase. Probably because I used the terminologies "backfill" or "catch-up" 
> which confused you. I hope the beginning of my message will give you a better 
> understanding.

Yes, you mix it up completely. Because backfill and catch-up strictly refer to 
"time intervals". And those only make sense for scheduled dags. 

> To circumvent it, manually calling POST 
> /api/v2/dags/DAG_2/assets/queuedEvents with the Asset Event of ID 1 would 
> simulate the DAG wasn't paused when the event happene

There is no new APIs needed for that. You can just trigger asset event 
manually. 
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/create_asset_event
 -> if Dag2 and Dag3 are unpaused and you want to trigger it "as if the dag1 
did it", you just have to create asset event - exactly the same as Dag 1 would 
do. Dag 1 - when completing simply creates the asset event that is it's output. 
You can do the same via API. You just needed to know what asset to create, 
which extras it should have. Once you create it Dag 2 And Dag 3 will be 
triggered.


You just need to know what assets and with what assets and with parameter you 
should trigger. And - as I repeated several times - you just need to (best) 
write a script that would find out (according to your criteria) which asset 
events and with which parameters you should trigger. 

This is what you can do **now** and what I think makes sense. 

However if you want to turn it into something with UI and being able to say 
"re-publish all the events, using **some criteria**  from the past and trigger 
all the Dags that have not been triggered by those events (which I understand 
from description) - then you are welcome to propose Airflow Improvement 
Proposal - as any othee new big features.

But I am afraid you have vastly oversimplified view on how assets and 
triggering and events work in Airflow. You limited your case to a single dag 
producing single event that triggers another Dag. This is but a fraction what 
Airflow assets and events do. 

Currently, events in Airflow do not have "state" -> they get triggerred, they 
trigger what is currently subscribed to according to their condition - which 
might be arbitrary complex. In many cases those events wil be simply archived 
very quickly - in big systems there might be 1000s of events per minute - so 
currently, when event is done, it's mostly used for UI. But you seem to want to 
persitently store the state of the event over history - basically tracking all 
the events triggered and evaluating triggering conditions.

You likely completely missed this chapter: 
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/asset-scheduling.html#advanced-asset-scheduling-with-conditional-expressions
 . Currently the whole design of assets is based on the notion that Dag simply 
"waits" for the triggering condition (which might be arbitrary complex and 
contains logical operations), it accumulates incoming events are they are 
coming and calculates the conditions based on those events and "fires" the 
downstream dag when the conditions are met. The only state that is stored is 
the "current input events state for dag" - and it's just "current". We do not 
evaluate or keep history of it, that conditions are evaluated looking at what 
happened since the last time the Dag was run.

For example if the same event is generated several times but the downstream 
asset waits for other condition to be received (and with some boolean logic 
applied) - the waiting dag basically "discards" those duplicated events - 
treating them as "single" event. So if you want to add a new Dag with some 
conditions, you would not only have to find past matching events - you would 
have to basically reply all the history for those events, track the status of 
the logical conditions, how they changed over time and determine how many times 
you need to trigger the new Dag - basically simulating all the events that were 
triggered in the past.

Possibly, you could use QueuedEvents, but this is not a question of "tweaking 
an API" - but designing and implementing huge new feature, considering data 
usage, consequences, storage mechanisms and persistency of  the data as well as 
performance impact of such re-valuations.

It's not a backfiil, nor catch-up (those are simple because they act on data 
intervals and not events and conditions that can be arbitrary complex). And it 
certainly won't use backfill UI - because the implementation and mechanism of 
backfill won't be usable for it - pretty much at all. So whan you prepare your 
proposal  it **might have** simillar UI - depending on kind of criteria you 
want to apply to past events.  

But this is a completely new feature - so  similarly to what Backfill proposal 
did 
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-78+Scheduler-managed+backfill
   - such document will have to be created,  discussed on the devlist, if there 
is feedback, the feedback should be applied by you and then final proposal 
should be voted on the devlist - same as Scheduler Managed Backfills were. If 
you get a successfull vote, you might proceed to implementing it.

But my suggestion - if you decide to use it - please, please., please., stop 
confusing it with Backfill or catch-up. Come up with another name. Backfill is 
something that has a very concrete meaning in Airflow and it's roots are in 
data interval -> so everyone will be completely confused if you keep naming it 
"backfill".

What you **really** want to do, is an interface to track history of asset 
events and allow to replay the history of events, re-evaluating the conditions 
to trigger downstream dags and re-trigger them accroding to history of what 
happened.

There is somehow related discussion 
https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-93+Asset+Watermarks+and+State+Variables
  -> asset watermarks - which also talk about some event state that is likely 
related. The thing here is that if you want to do something like that you also 
have to store some state about the events - a bit different but related.  So 
likely discussion about those new proposals should be somehow coordinated.

GitHub link: 
https://github.com/apache/airflow/discussions/59886#discussioncomment-15383235

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to