And the reason for including dag_id/task_id in that new table is to
avoid a new "top level query", but instead that we can add in to the
existing "look at dagrun X" scheduling.
(Strictly speaking we could just have a table of "dataset X has just
been published", but for that to work we would need a new "top level"
section of the main scheduler loop, which adds extra cost even when
nothing is to be done.)
-ash
On Wed, Jun 8 2022 at 17:23:21 +0100, Ash Berlin-Taylor
<[email protected]> wrote:
Hi Ping,
Good idea.
Very roughly:
We will have a create a use a new database table to store a queue of
datasets publishes to be actioned, with columns (dag_id, task_id,
run_id, dataset_uri). Rows in that table are created by TaskInstance
in the same transaction where that TI is marked to success (This is
important so that we don't "miss"
The DagRuns for dataset aware scheduling will run from within the
DagRun.update_state function of the _producing_ DagRun (which is
called from either the mini scheduler in LocalTaskJob, or main
scheduler loop) will look at pending rows and create any necessary
DagRuns there, and then delete the pending_dataset_events row.
We need to store these in a DB as a queue since there can be multiple
places where the code could run, and we don't want to loose events,
i.e. a simple "last_updated" on the row in the "dataset" DB table is
not enough, as it's possible two TIs from different DagRuns could
complete at almost the same instance, and we don't want to not create
a data-scheduled DagRun.
The advantage of doing it this was is that there is next-to-no impact
on scheduling performance if you don't have a task that produces
datasets.
Does that give you enough an idea as to how we plan on implementing
this?
Thanks,
Ash
On Mon, Jun 6 2022 at 19:11:40 -0700, Ping Zhang <[email protected]>
wrote:
Hi Ash,
Thanks for introducing the data-driven scheduling and thoughtful
future work section in the AIP.
Could you please add a section to talk about the high level/early
tech design in the AIP? This is a new scheduling pattern that can
have many implications, thus we (our team) would love to know more
about how you will design and implement it.
Thanks,
Ping
On Wed, Jun 1, 2022 at 9:34 AM Ash Berlin-Taylor <[email protected]
<mailto:[email protected]>> wrote:
Hi All,
Now that Summit is over (well done all the speakers! The talks I've
caught so far have been great) I'm ready to push forward with Data
Driven Scheduling, and I would like to call for a vote on
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling>
The vote for last for 7 days, until 2022/06/07 at 16:30 UTC.
(This is my +1 vote)
I have just published updates to the AIP, hopefully to make the AIP
tighter in scope (and easier to implement too). The tl;dr of this
AIP:
- Add a concept of Dataset (which is a uri-parsable str. Airflow
places no meaning on what the URI contains/means/is - "airflow:"
scheme is reserved)
- A task "produces" a dataset by a) Having it in it's outlets
attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't
know/care about data transfer/SQL tables etc. It is just
conceptually)
- A DAG says that it wants to be triggered when it's dataset (or
any of it's datasets) change. When this happens the scheduler will
create the dag run.
This is just a high level summary, please read the confluence page
for full details.
We have already thought about lots of ways we can (and will) extend
this in the over time, detailed in the "Future work" section. Our
goal with this AIP is to build the kernel of Data-aware Scheduling
that we can build on over time.
A teaser/example DAG that hopefully gives a clue as to what we are
talking about here:
```
import pandas as pd
from airflowimport dag, Dataset
dataset= Dataset("s3://s3_default@some_bucket/order_data")
@dag
def my_dag():
@dag.task(outlets=[dataset])
def producer():
# What this task actually does doesn't matter to Airflow,
the simple act of running to SUCCESS means the dataset
# is updated, and downstream dags will get triggered
...
dataset= Dataset("s3://s3_default@some_bucket/order_data")
@dag(schedule_on=dataset)
def consuming_dag():
@dag.task
def consumer(uri):
df= pandas.read_from_s3(uri)
print(f" Dataset had {df.count()} rows")
consumer(df=ref.uri)
```
If anyone has any changes you think are fundamental/foundational to
the core idea you have 1 week to raise it :) (Names of parameters
we can easily change as we implement this) Our desire is to get
this written and released Airflow 2.4.
Thanks,
Ash