GitHub user hnomichith added a comment to the discussion: Add the ability to 
backfill a DAG based on past Asset Events

> In this case you just start a New dag. It does not "catch up" with anything. 
> Because it will be created NOW not in the past.

Let's not limit to the semantic. From all the beginning of your answer, it 
seems neither "backfill" nor "catch-up" is the right terminology.

> And it makes absolutely no sense to catch-up old dags if you do not not want 
> to trigger downstream dags.

I think you are mis-understanding here : I don't want to "catch-up old dags" 
without "trigger downstream dags".

Take back my example with DAG_1, DAG_2, DAG_3.  DAG_1 is creating some data, at 
non regular frequency, then those data must be processed by DAG_2 and DAG_3. 
This seems to be EXACTLY the use case fitting the usage of Airflow Assets: 
DAG_1 updates an Airflow asset, and this update triggers DAG_2 and DAG_3. Don't 
you agree? Do you still think using assets here is a strange reason?

The issue now is that those DAGs are not necessarily created at the same time. 
When creating DAG_2, I want to process the updates that happened on the data. 
Then when creating DAG_3, I also want to process the data. 

Does it still make absolutely no sense to trigger independently DAG_2 or DAG_3, 
without having to re-trigger DAG_1?

Indeed, if there were 5 asset events in the past, which means my data had 5 
updates in the past, I want to trigger 5 new DAG runs, NOW, to process the data 
with my DAG_2. I don't care that the DAG run logical date is now, I just care 
about the data I'm processing.

How would you call this? Is it such an exotic or wrong usecase?

> You can "clear" past run, yes, but you cannot create a past-time asset 
> triggered dag run unless you specify time in asset-specific way.

To reformulate here: I want to create NOW an asset triggered dag run, giving it 
as input a past asset event.

> > /api/v2/dags/:dag_id/assets/queuedEvents
>
> I have no idea what that event would do in term of the "catchup" described 
> above.

Let say a run of DAG_1 creates an Asset Event of ID 1.

At this moment, if DAG_2 is unpaused, then an DAG run for DAG_2 is queued, with 
the created event (what I understand from [this 
code](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/assets/manager.py#L279)).
 Thanks to this, the scheduler is able to create the DAG runs (what I 
understand from [this 
code](https://github.com/apache/airflow/blob/c8aa74a19e39be75800ebd13289bf0713b9718b4/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L1728))

However, if DAG_2 is paused or not created at this moment, then it won't have 
anything queued.

To circumvent it, manually calling `POST 
/api/v2/dags/DAG_2/assets/queuedEvents` with the Asset Event of ID 1 would 
simulate the DAG wasn't paused when the event happened.

Ultimately, this endpoint would allow DAG_2 to process the data it wasn't able 
to process because it was paused, without having to touch DAG_1.

> Probably what you want:
> [...]

>From your second message, I really think you are mis-understanding my usecase. 
>Probably because I used the terminologies "backfill" or "catch-up" which 
>confused you. I hope the beginning of my message will give you a better 
>understanding.

GitHub link: 
https://github.com/apache/airflow/discussions/59886#discussioncomment-15382328

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to