The way I've described it, it won't need any extra locking of rows and will tie in to the existing lock off there (source) dag run row.
-Ash On 8 June 2022 21:45:26 BST, Ping Zhang <[email protected]> wrote: >Thanks Ash. It is nice that this design will have next-to-no impact on >scheduling performance. > >One quick question when you say " DB as a queue ", will it leverage skip >lock feature from the db? > >Thanks, > >Ping > > > > > >Thanks, > >Ping > > >On Wed, Jun 8, 2022 at 9:59 AM Ash Berlin-Taylor <[email protected]> wrote: > >> And the reason for including dag_id/task_id in that new table is to avoid >> a new "top level query", but instead that we can add in to the existing >> "look at dagrun X" scheduling. >> >> (Strictly speaking we could just have a table of "dataset X has just been >> published", but for that to work we would need a new "top level" section of >> the main scheduler loop, which adds extra cost even when nothing is to be >> done.) >> >> -ash >> >> On Wed, Jun 8 2022 at 17:23:21 +0100, Ash Berlin-Taylor <[email protected]> >> wrote: >> >> Hi Ping, >> >> Good idea. >> >> Very roughly: >> >> We will have a create a use a new database table to store a queue of >> datasets publishes to be actioned, with columns (dag_id, task_id, run_id, >> dataset_uri). Rows in that table are created by TaskInstance in the same >> transaction where that TI is marked to success (This is important so that >> we don't "miss" >> >> The DagRuns for dataset aware scheduling will run from within the >> DagRun.update_state function of the _producing_ DagRun (which is called >> from either the mini scheduler in LocalTaskJob, or main scheduler loop) >> will look at pending rows and create any necessary DagRuns there, and then >> delete the pending_dataset_events row. >> >> We need to store these in a DB as a queue since there can be multiple >> places where the code could run, and we don't want to loose events, i.e. a >> simple "last_updated" on the row in the "dataset" DB table is not enough, >> as it's possible two TIs from different DagRuns could complete at almost >> the same instance, and we don't want to not create a data-scheduled DagRun. >> >> The advantage of doing it this was is that there is next-to-no impact on >> scheduling performance if you don't have a task that produces datasets. >> >> Does that give you enough an idea as to how we plan on implementing this? >> >> Thanks, >> Ash >> >> >> On Mon, Jun 6 2022 at 19:11:40 -0700, Ping Zhang <[email protected]> wrote: >> >> Hi Ash, >> >> Thanks for introducing the data-driven scheduling and thoughtful future >> work section in the AIP. >> >> >> Could you please add a section to talk about the high level/early tech >> design in the AIP? This is a new scheduling pattern that can have many >> implications, thus we (our team) would love to know more about how you will >> design and implement it. >> >> >> Thanks, >> >> Ping >> >> >> On Wed, Jun 1, 2022 at 9:34 AM Ash Berlin-Taylor <[email protected]> wrote: >> >>> Hi All, >>> >>> Now that Summit is over (well done all the speakers! The talks I've >>> caught so far have been great) I'm ready to push forward with Data Driven >>> Scheduling, and I would like to call for a vote on >>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling >>> >>> The vote for last for 7 days, until 2022/06/07 at 16:30 UTC. >>> >>> (This is my +1 vote) >>> >>> I have just published updates to the AIP, hopefully to make the AIP >>> tighter in scope (and easier to implement too). The tl;dr of this AIP: >>> >>> - Add a concept of Dataset (which is a uri-parsable str. Airflow places >>> no meaning on what the URI contains/means/is - "airflow:" scheme is >>> reserved) >>> - A task "produces" a dataset by a) Having it in it's outlets attribute, >>> and b) finishing with SUCCESS. (That is, Airflow doesn't know/care about >>> data transfer/SQL tables etc. It is just conceptually) >>> - A DAG says that it wants to be triggered when it's dataset (or any of >>> it's datasets) change. When this happens the scheduler will create the dag >>> run. >>> >>> This is just a high level summary, please read the confluence page for >>> full details. >>> >>> We have already thought about lots of ways we can (and will) extend this >>> in the over time, detailed in the "Future work" section. Our goal with this >>> AIP is to build the kernel of Data-aware Scheduling that we can build on >>> over time. >>> >>> A teaser/example DAG that hopefully gives a clue as to what we are >>> talking about here: >>> >>> ``` >>> import pandas as pd >>> >>> from airflow import dag, Dataset >>> >>> >>> dataset = Dataset("s3://s3_default@some_bucket/order_data") >>> @dag >>> def my_dag(): >>> >>> @dag.task(outlets=[dataset]) >>> def producer(): >>> # What this task actually does doesn't matter to Airflow, the >>> simple act of running to SUCCESS means the dataset >>> # is updated, and downstream dags will get triggered >>> ... >>> >>> >>> >>> dataset = Dataset("s3://s3_default@some_bucket/order_data") >>> @dag(schedule_on=dataset) >>> def consuming_dag(): >>> @dag.task >>> def consumer(uri): >>> df = pandas.read_from_s3(uri) >>> print(f" Dataset had {df.count()} rows") >>> >>> consumer(df=ref.uri) >>> ``` >>> >>> If anyone has any changes you think are fundamental/foundational to the >>> core idea you have 1 week to raise it :) (Names of parameters we can easily >>> change as we implement this) Our desire is to get this written and released >>> Airflow 2.4. >>> >>> Thanks, >>> Ash >>> >>>
