Hey everyone,

I have a small proposal about adding a new overloaded operator - initial
proposal '>>=` (and a method), resulting from our discussion on AIP-47 -
system tests refactoring (
https://lists.apache.org/thread/htd4013yn483qfhwv11vc26jpf2yvjph).

The ">>=" operator is not really 100% necessary to complete the AIP-47 (we
have ways around it with somewhat complex-ish pre-commits) but it might
simplify the way how we approach the example dags turned into system tests
and make them more maintainable - but also it might simplify "real" DAGS
for some users.

Context:

In the AIP-47 we have a need for "status" like functionality in system
tests. The basic idea is to encompass all "test code" in a single file -
example dag. This way each system test (which are currently spread among
DAG files, pytest tests and configuration will "shrink" to the single
"example_dag" file. This is a great simplification and it will be extremely
helpful in system tests but it means that we need a "status" like
functionality in the DAGs that will fail in case any of the tasks failed
during the execution.

In a number of DAGs we have to do "cleanup/tearDown" as the last step no
matter if any of the tasks failed (and this can be easily done with "leaf"
all_done rule) but then such a cleanup operation determines the status of
the DAG ("failed" succeeded". So we need to by-pass the "cleanup" task and
in case of any task failure execute the "watcher" task (as we named it) as
a leaf-node to determine the status of the whole DAG.

Proposed Solution:

Maybe there is an easier way than what we came up with (happy to hear it),
but the idea we have is to have a "watcher" leaf node task that has
"one_failed" dependency on all the tasks in the example DAG. This leads to
all a bit more complex example dags to end up with:

[task1, task2, task3, ... ] >> watcher

This is a bit brittle because it is long and you have to remember to add
new tasks added in the example dags in the future (and it's very easy to
miss it as there is no "problem" if you miss a task). We can automate it in
pre-commits for example dags, but we could easily solve it by adding a new
operator, that would add "one_failed" dependency between all other tasks to
the watcher task:

Pseudocode:

I proposed >>= operator but there also should be a "set_" method):

dag >>= watcher

the operator:

for task in dag.task:
    if not task == watcher:
        task >> watcher

The operator might also be needed in other scenarios - when someone wants
to send notifications when any of the tasks failed (
https://stackoverflow.com/questions/50959743/airflow-trigger-rule-using-one-failed-cause-dag-failure
)
or (I imagine) when you want to make sure to teardown your whole
infrastructure you set-up in a complex DAG with cases similar to the case
Cloudflare presented last year in the summit
https://airflowsummit.org/sessions/2021/provision-as-a-service/ . Some of
that can be done with cluster policies etc. But I thought this might be a
much nicer way.

WDYT? Are there any better/simpler ways of solving this problem that we are
not aware of ? Do you think it is "enough" of justification to add a new
method/overloaded operator like that? Are there any strong "no", or maybe
we could agree to it via lazy consensus if there are some supportive voices
and no viable alternative?

BTW. We are also open to change (or drop) the ">>=" proposed operator if
someone thinks this might be confusing.

J.

Reply via email to