Thanks Ash. It is nice that this design will have next-to-no impact on scheduling performance.
One quick question when you say " DB as a queue ", will it leverage skip lock feature from the db? Thanks, Ping Thanks, Ping On Wed, Jun 8, 2022 at 9:59 AM Ash Berlin-Taylor <[email protected]> wrote: > And the reason for including dag_id/task_id in that new table is to avoid > a new "top level query", but instead that we can add in to the existing > "look at dagrun X" scheduling. > > (Strictly speaking we could just have a table of "dataset X has just been > published", but for that to work we would need a new "top level" section of > the main scheduler loop, which adds extra cost even when nothing is to be > done.) > > -ash > > On Wed, Jun 8 2022 at 17:23:21 +0100, Ash Berlin-Taylor <[email protected]> > wrote: > > Hi Ping, > > Good idea. > > Very roughly: > > We will have a create a use a new database table to store a queue of > datasets publishes to be actioned, with columns (dag_id, task_id, run_id, > dataset_uri). Rows in that table are created by TaskInstance in the same > transaction where that TI is marked to success (This is important so that > we don't "miss" > > The DagRuns for dataset aware scheduling will run from within the > DagRun.update_state function of the _producing_ DagRun (which is called > from either the mini scheduler in LocalTaskJob, or main scheduler loop) > will look at pending rows and create any necessary DagRuns there, and then > delete the pending_dataset_events row. > > We need to store these in a DB as a queue since there can be multiple > places where the code could run, and we don't want to loose events, i.e. a > simple "last_updated" on the row in the "dataset" DB table is not enough, > as it's possible two TIs from different DagRuns could complete at almost > the same instance, and we don't want to not create a data-scheduled DagRun. > > The advantage of doing it this was is that there is next-to-no impact on > scheduling performance if you don't have a task that produces datasets. > > Does that give you enough an idea as to how we plan on implementing this? > > Thanks, > Ash > > > On Mon, Jun 6 2022 at 19:11:40 -0700, Ping Zhang <[email protected]> wrote: > > Hi Ash, > > Thanks for introducing the data-driven scheduling and thoughtful future > work section in the AIP. > > > Could you please add a section to talk about the high level/early tech > design in the AIP? This is a new scheduling pattern that can have many > implications, thus we (our team) would love to know more about how you will > design and implement it. > > > Thanks, > > Ping > > > On Wed, Jun 1, 2022 at 9:34 AM Ash Berlin-Taylor <[email protected]> wrote: > >> Hi All, >> >> Now that Summit is over (well done all the speakers! The talks I've >> caught so far have been great) I'm ready to push forward with Data Driven >> Scheduling, and I would like to call for a vote on >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling >> >> The vote for last for 7 days, until 2022/06/07 at 16:30 UTC. >> >> (This is my +1 vote) >> >> I have just published updates to the AIP, hopefully to make the AIP >> tighter in scope (and easier to implement too). The tl;dr of this AIP: >> >> - Add a concept of Dataset (which is a uri-parsable str. Airflow places >> no meaning on what the URI contains/means/is - "airflow:" scheme is >> reserved) >> - A task "produces" a dataset by a) Having it in it's outlets attribute, >> and b) finishing with SUCCESS. (That is, Airflow doesn't know/care about >> data transfer/SQL tables etc. It is just conceptually) >> - A DAG says that it wants to be triggered when it's dataset (or any of >> it's datasets) change. When this happens the scheduler will create the dag >> run. >> >> This is just a high level summary, please read the confluence page for >> full details. >> >> We have already thought about lots of ways we can (and will) extend this >> in the over time, detailed in the "Future work" section. Our goal with this >> AIP is to build the kernel of Data-aware Scheduling that we can build on >> over time. >> >> A teaser/example DAG that hopefully gives a clue as to what we are >> talking about here: >> >> ``` >> import pandas as pd >> >> from airflow import dag, Dataset >> >> >> dataset = Dataset("s3://s3_default@some_bucket/order_data") >> @dag >> def my_dag(): >> >> @dag.task(outlets=[dataset]) >> def producer(): >> # What this task actually does doesn't matter to Airflow, the >> simple act of running to SUCCESS means the dataset >> # is updated, and downstream dags will get triggered >> ... >> >> >> >> dataset = Dataset("s3://s3_default@some_bucket/order_data") >> @dag(schedule_on=dataset) >> def consuming_dag(): >> @dag.task >> def consumer(uri): >> df = pandas.read_from_s3(uri) >> print(f" Dataset had {df.count()} rows") >> >> consumer(df=ref.uri) >> ``` >> >> If anyone has any changes you think are fundamental/foundational to the >> core idea you have 1 week to raise it :) (Names of parameters we can easily >> change as we implement this) Our desire is to get this written and released >> Airflow 2.4. >> >> Thanks, >> Ash >> >>
