The way I've described it, it won't need any extra locking of rows and will tie 
in to the existing lock off there (source) dag run row.

-Ash

On 8 June 2022 21:45:26 BST, Ping Zhang <[email protected]> wrote:
>Thanks Ash. It is nice that this design will have next-to-no impact on
>scheduling performance.
>
>One quick question when you say " DB as a queue ", will it leverage skip
>lock feature from the db?
>
>Thanks,
>
>Ping
>
>
>
>
>
>Thanks,
>
>Ping
>
>
>On Wed, Jun 8, 2022 at 9:59 AM Ash Berlin-Taylor <[email protected]> wrote:
>
>> And the reason for including dag_id/task_id in that new table is to avoid
>> a new "top level query", but instead that we can add in to the existing
>> "look at dagrun X" scheduling.
>>
>> (Strictly speaking we could just have a table of "dataset X has just been
>> published", but for that to work we would need a new "top level" section of
>> the main scheduler loop, which adds extra cost even when nothing is to be
>> done.)
>>
>> -ash
>>
>> On Wed, Jun 8 2022 at 17:23:21 +0100, Ash Berlin-Taylor <[email protected]>
>> wrote:
>>
>> Hi Ping,
>>
>> Good idea.
>>
>> Very roughly:
>>
>> We will have a create a use a new database table to store a queue of
>>  datasets publishes to be actioned, with columns  (dag_id, task_id, run_id,
>> dataset_uri). Rows in that table are created by TaskInstance in the same
>> transaction where that TI is marked to success (This is important so that
>> we don't "miss"
>>
>> The DagRuns for dataset aware scheduling will run from within the
>> DagRun.update_state function of the _producing_ DagRun (which is called
>> from either the mini scheduler in LocalTaskJob, or main scheduler loop)
>> will look at pending rows and create any necessary DagRuns there, and then
>> delete the pending_dataset_events row.
>>
>> We need to store these in a DB as a queue since there can be multiple
>> places where the code could run, and we don't want to loose events, i.e. a
>> simple "last_updated" on the row in the "dataset" DB table is not enough,
>> as it's possible two TIs from different DagRuns could complete at almost
>> the same instance, and we don't want to not create a data-scheduled DagRun.
>>
>> The advantage of doing it this was is that there is next-to-no impact on
>> scheduling performance if you don't have a task that produces datasets.
>>
>> Does that give you enough an idea as to how we plan on implementing this?
>>
>> Thanks,
>> Ash
>>
>>
>> On Mon, Jun 6 2022 at 19:11:40 -0700, Ping Zhang <[email protected]> wrote:
>>
>> Hi Ash,
>>
>> Thanks for introducing the data-driven scheduling and thoughtful future
>> work section in the AIP.
>>
>>
>> Could you please add a section to talk about the high level/early tech
>> design in the AIP? This is a new scheduling pattern that can have many
>> implications, thus we (our team) would love to know more about how you will
>> design and implement it.
>>
>>
>> Thanks,
>>
>> Ping
>>
>>
>> On Wed, Jun 1, 2022 at 9:34 AM Ash Berlin-Taylor <[email protected]> wrote:
>>
>>> Hi All,
>>>
>>> Now that Summit is over (well done all the speakers! The talks I've
>>> caught so far have been great) I'm ready to push forward with Data Driven
>>> Scheduling, and I would like to call for a vote on
>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling
>>>
>>> The vote for last for 7 days, until 2022/06/07 at 16:30 UTC.
>>>
>>> (This is my +1 vote)
>>>
>>> I have just published updates to the AIP, hopefully to make the AIP
>>> tighter in scope (and easier to implement too). The tl;dr of this AIP:
>>>
>>> - Add a concept of Dataset (which is a uri-parsable str. Airflow places
>>> no meaning on what the URI contains/means/is - "airflow:" scheme is
>>> reserved)
>>> - A task "produces" a dataset by a) Having it in it's outlets attribute,
>>> and b) finishing with SUCCESS. (That is, Airflow doesn't  know/care about
>>> data transfer/SQL tables etc. It is just conceptually)
>>> - A DAG says that it wants to be triggered when it's dataset (or any of
>>> it's datasets) change. When this happens the scheduler will create the dag
>>> run.
>>>
>>> This is just a high level summary, please read the confluence page for
>>> full details.
>>>
>>> We have already thought about lots of ways we can (and will) extend this
>>> in the over time, detailed in the "Future work" section. Our goal with this
>>> AIP is to build the kernel of Data-aware Scheduling that we can build on
>>> over time.
>>>
>>> A teaser/example DAG that hopefully gives a clue as to what we are
>>> talking about here:
>>>
>>> ```
>>> import pandas as pd
>>>
>>> from airflow import dag, Dataset
>>>
>>>
>>> dataset = Dataset("s3://s3_default@some_bucket/order_data")
>>> @dag
>>> def my_dag():
>>>
>>>     @dag.task(outlets=[dataset])
>>>     def producer():
>>>         # What this task actually does doesn't matter to Airflow, the
>>> simple act of running to SUCCESS means the dataset
>>>         # is updated, and downstream dags will get triggered
>>>         ...
>>>
>>>
>>>
>>> dataset = Dataset("s3://s3_default@some_bucket/order_data")
>>> @dag(schedule_on=dataset)
>>> def consuming_dag():
>>>     @dag.task
>>>     def consumer(uri):
>>>         df = pandas.read_from_s3(uri)
>>>         print(f" Dataset had {df.count()} rows")
>>>
>>>     consumer(df=ref.uri)
>>> ```
>>>
>>> If anyone has any changes you think are fundamental/foundational to the
>>> core idea you have 1 week to raise it :) (Names of parameters we can easily
>>> change as we implement this) Our desire is to get this written and released
>>> Airflow 2.4.
>>>
>>> Thanks,
>>> Ash
>>>
>>>

Reply via email to