Thanks Ash. It is nice that this design will have next-to-no impact on
scheduling performance.

One quick question when you say " DB as a queue ", will it leverage skip
lock feature from the db?

Thanks,

Ping





Thanks,

Ping


On Wed, Jun 8, 2022 at 9:59 AM Ash Berlin-Taylor <[email protected]> wrote:

> And the reason for including dag_id/task_id in that new table is to avoid
> a new "top level query", but instead that we can add in to the existing
> "look at dagrun X" scheduling.
>
> (Strictly speaking we could just have a table of "dataset X has just been
> published", but for that to work we would need a new "top level" section of
> the main scheduler loop, which adds extra cost even when nothing is to be
> done.)
>
> -ash
>
> On Wed, Jun 8 2022 at 17:23:21 +0100, Ash Berlin-Taylor <[email protected]>
> wrote:
>
> Hi Ping,
>
> Good idea.
>
> Very roughly:
>
> We will have a create a use a new database table to store a queue of
>  datasets publishes to be actioned, with columns  (dag_id, task_id, run_id,
> dataset_uri). Rows in that table are created by TaskInstance in the same
> transaction where that TI is marked to success (This is important so that
> we don't "miss"
>
> The DagRuns for dataset aware scheduling will run from within the
> DagRun.update_state function of the _producing_ DagRun (which is called
> from either the mini scheduler in LocalTaskJob, or main scheduler loop)
> will look at pending rows and create any necessary DagRuns there, and then
> delete the pending_dataset_events row.
>
> We need to store these in a DB as a queue since there can be multiple
> places where the code could run, and we don't want to loose events, i.e. a
> simple "last_updated" on the row in the "dataset" DB table is not enough,
> as it's possible two TIs from different DagRuns could complete at almost
> the same instance, and we don't want to not create a data-scheduled DagRun.
>
> The advantage of doing it this was is that there is next-to-no impact on
> scheduling performance if you don't have a task that produces datasets.
>
> Does that give you enough an idea as to how we plan on implementing this?
>
> Thanks,
> Ash
>
>
> On Mon, Jun 6 2022 at 19:11:40 -0700, Ping Zhang <[email protected]> wrote:
>
> Hi Ash,
>
> Thanks for introducing the data-driven scheduling and thoughtful future
> work section in the AIP.
>
>
> Could you please add a section to talk about the high level/early tech
> design in the AIP? This is a new scheduling pattern that can have many
> implications, thus we (our team) would love to know more about how you will
> design and implement it.
>
>
> Thanks,
>
> Ping
>
>
> On Wed, Jun 1, 2022 at 9:34 AM Ash Berlin-Taylor <[email protected]> wrote:
>
>> Hi All,
>>
>> Now that Summit is over (well done all the speakers! The talks I've
>> caught so far have been great) I'm ready to push forward with Data Driven
>> Scheduling, and I would like to call for a vote on
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling
>>
>> The vote for last for 7 days, until 2022/06/07 at 16:30 UTC.
>>
>> (This is my +1 vote)
>>
>> I have just published updates to the AIP, hopefully to make the AIP
>> tighter in scope (and easier to implement too). The tl;dr of this AIP:
>>
>> - Add a concept of Dataset (which is a uri-parsable str. Airflow places
>> no meaning on what the URI contains/means/is - "airflow:" scheme is
>> reserved)
>> - A task "produces" a dataset by a) Having it in it's outlets attribute,
>> and b) finishing with SUCCESS. (That is, Airflow doesn't  know/care about
>> data transfer/SQL tables etc. It is just conceptually)
>> - A DAG says that it wants to be triggered when it's dataset (or any of
>> it's datasets) change. When this happens the scheduler will create the dag
>> run.
>>
>> This is just a high level summary, please read the confluence page for
>> full details.
>>
>> We have already thought about lots of ways we can (and will) extend this
>> in the over time, detailed in the "Future work" section. Our goal with this
>> AIP is to build the kernel of Data-aware Scheduling that we can build on
>> over time.
>>
>> A teaser/example DAG that hopefully gives a clue as to what we are
>> talking about here:
>>
>> ```
>> import pandas as pd
>>
>> from airflow import dag, Dataset
>>
>>
>> dataset = Dataset("s3://s3_default@some_bucket/order_data")
>> @dag
>> def my_dag():
>>
>>     @dag.task(outlets=[dataset])
>>     def producer():
>>         # What this task actually does doesn't matter to Airflow, the
>> simple act of running to SUCCESS means the dataset
>>         # is updated, and downstream dags will get triggered
>>         ...
>>
>>
>>
>> dataset = Dataset("s3://s3_default@some_bucket/order_data")
>> @dag(schedule_on=dataset)
>> def consuming_dag():
>>     @dag.task
>>     def consumer(uri):
>>         df = pandas.read_from_s3(uri)
>>         print(f" Dataset had {df.count()} rows")
>>
>>     consumer(df=ref.uri)
>> ```
>>
>> If anyone has any changes you think are fundamental/foundational to the
>> core idea you have 1 week to raise it :) (Names of parameters we can easily
>> change as we implement this) Our desire is to get this written and released
>> Airflow 2.4.
>>
>> Thanks,
>> Ash
>>
>>

Reply via email to