Hi Pablo, Wow, I really love this idea. This will greatly enrich the airflow ecosystem.
I agree with Ash, it is better to have it as a standalone package. And we can use this framework to write airflow core invariants tests, so that we will run them on every airflow release to guarantee no regressions. Thanks, Ping On Sun, Jul 17, 2022 at 1:09 PM Pablo Estrada <[email protected]> wrote: > Understood! > > TL;DR: I propose a testing framework where users can check for 'DAG > execution invariants' or 'DAG execution expectations' given certain task > outcomes. > > As DAGs grow in complexity, sometimes it might become difficult to reason > about their runtime behavior in many scenarios. Users may want to lay out > rules in the form of tests that can verify DAG execution results. For > example: > > - If any of my database_backup_* tasks fails, I want to ensure that at > least one email_alert_* task will run. > - If my 'check_authentication' task fails, I want to ensure that the whole > DAG will fail. > - If any of my DataflowOperator tasks fails, I want to ensure that a > PubsubOperator downstream will always run. > > These sorts of invariants don't need the DAG to be executed; but in fact, > they are pretty hard to test today: Staging environments can't check every > possible runtime outcome. > > In this framework, users would define unit tests like this: > > ``` > def test_my_example_dag(): > the_dag = models.DAG( > 'the_basic_dag', > schedule_interval='@daily', > start_date=DEFAULT_DATE, > ) > > with the_dag: > op1 = EmptyOperator(task_id='task_1') > op2 = EmptyOperator(task_id='task_2') > op3 = EmptyOperator(task_id='task_3') > > op1 >> op2 >> op3 > # DAG invariant: If task_1 and task_2 succeeds, then task_3 will > always run > assert_that( > given(thedag)\ > .when(task('task_1'), succeeds())\ > .and_(task('task_2'), succeeds())\ > .then(task('task_3'), runs())) > ``` > > This is a very simple example - and it's not great, because it only > duplicates the DAG logic - but you can see more examples in my draft PR > <https://github.com/apache/airflow/pull/25112/files#diff-b1f30afa38d247f9204790392ab6888b04288603ac4d38154d05e6c5b998cf85R28-R82>[1] > and in my draft AIP > <https://docs.google.com/document/d/1priak1uiJTXP1F9K5B8XS8qmeRbJ8trYLvE4k2aBY5c/edit#heading=h.atmk0p7fmv7g> > [2]. > > I started writing up an AIP in a Google doc[2] which y'all can check. It's > very close to what I have written here : ) > > LMK what y'all think. I am also happy to publish this as a separate > library if y'all wanna be cautious about adding it directly to Airflow. > -P. > > [1] > https://github.com/apache/airflow/pull/25112/files#diff-b1f30afa38d247f9204790392ab6888b04288603ac4d38154d05e6c5b998cf85R28-R82 > [2] > https://docs.google.com/document/d/1priak1uiJTXP1F9K5B8XS8qmeRbJ8trYLvE4k2aBY5c/edit# > > > On Sun, Jul 17, 2022 at 2:13 AM Jarek Potiuk <[email protected]> wrote: > >> Yep. Just outline your proposal on devlist, Pablo :). >> >> On Sun, Jul 17, 2022 at 10:35 AM Ash Berlin-Taylor <[email protected]> >> wrote: >> > >> > Hi Pablo, >> > >> > Could you describe at a high level what you are thinking of? It's >> entirely possible it doesn't need any changes to core Airflow, or isn't >> significant enough to need an AIP. >> > >> > Thanks, >> > Ash >> > >> > On 17 July 2022 07:43:54 BST, Pablo Estrada <[email protected]> >> wrote: >> >> >> >> Hi there! >> >> I would like to start a discussion of an idea that I had for a testing >> framework for airflow. >> >> I believe the first step would be to write up an AIP - so could I have >> access to write a new one on the cwiki? >> >> >> >> Thanks! >> >> -P. >> >
