dag.tasks >> watcher()
No new syntax nor pre-commit needed :)
On Thu, Feb 10 2022 at 15:35:58 +0100, Jarek Potiuk <[email protected]>
wrote:
Hey everyone,
I have a small proposal about adding a new overloaded operator -
initial proposal '>>=` (and a method), resulting from our discussion
on AIP-47 - system tests refactoring
(<https://lists.apache.org/thread/htd4013yn483qfhwv11vc26jpf2yvjph>).
The ">>=" operator is not really 100% necessary to complete the
AIP-47 (we have ways around it with somewhat complex-ish pre-commits)
but it might simplify the way how we approach the example dags turned
into system tests and make them more maintainable - but also it might
simplify "real" DAGS for some users.
Context:
In the AIP-47 we have a need for "status" like functionality in
system tests. The basic idea is to encompass all "test code" in a
single file - example dag. This way each system test (which are
currently spread among DAG files, pytest tests and configuration will
"shrink" to the single "example_dag" file. This is a great
simplification and it will be extremely helpful in system tests but
it means that we need a "status" like functionality in the DAGs that
will fail in case any of the tasks failed during the execution.
In a number of DAGs we have to do "cleanup/tearDown" as the last step
no matter if any of the tasks failed (and this can be easily done
with "leaf" all_done rule) but then such a cleanup operation
determines the status of the DAG ("failed" succeeded". So we need to
by-pass the "cleanup" task and in case of any task failure execute
the "watcher" task (as we named it) as a leaf-node to determine the
status of the whole DAG.
Proposed Solution:
Maybe there is an easier way than what we came up with (happy to hear
it), but the idea we have is to have a "watcher" leaf node task that
has "one_failed" dependency on all the tasks in the example DAG. This
leads to all a bit more complex example dags to end up with:
[task1, task2, task3, ... ] >> watcher
This is a bit brittle because it is long and you have to remember to
add new tasks added in the example dags in the future (and it's very
easy to miss it as there is no "problem" if you miss a task). We can
automate it in pre-commits for example dags, but we could easily
solve it by adding a new operator, that would add "one_failed"
dependency between all other tasks to the watcher task:
Pseudocode:
I proposed >>= operator but there also should be a "set_" method):
dag >>= watcher
the operator:
for task in dag.task:
if not task == watcher:
task >> watcher
The operator might also be needed in other scenarios - when someone
wants to send notifications when any of the tasks failed
(<https://stackoverflow.com/questions/50959743/airflow-trigger-rule-using-one-failed-cause-dag-failure>)
or (I imagine) when you want to make sure to teardown your whole
infrastructure you set-up in a complex DAG with cases similar to the
case Cloudflare presented last year in the summit
<https://airflowsummit.org/sessions/2021/provision-as-a-service/> .
Some of that can be done with cluster policies etc. But I thought
this might be a much nicer way.
WDYT? Are there any better/simpler ways of solving this problem that
we are not aware of ? Do you think it is "enough" of justification to
add a new method/overloaded operator like that? Are there any strong
"no", or maybe we could agree to it via lazy consensus if there are
some supportive voices and no viable alternative?
BTW. We are also open to change (or drop) the ">>=" proposed operator
if someone thinks this might be confusing.
J.