The one caveat to this is you have to do it "last" (obviously) and the
direction matters.
For instance `watcher() << dag.tasks` doesn't work as it tries to set
watcher to a dependency of itself which falls foul of the cycle
detector (the Acyclic property of DAG is enforced).
But given list >> task works fine that's not a problem. Just FYI I guess
-ash
On Thu, Feb 10 2022 at 20:00:58 +0100, Jarek Potiuk <[email protected]>
wrote:
Ash! you are my hero :)
On Thu, Feb 10, 2022 at 7:58 PM Ash Berlin-Taylor <[email protected]
<mailto:[email protected]>> wrote:
dag.tasks >> watcher()
No new syntax nor pre-commit needed :)
On Thu, Feb 10 2022 at 15:35:58 +0100, Jarek Potiuk
<[email protected] <mailto:[email protected]>> wrote:
Hey everyone,
I have a small proposal about adding a new overloaded operator -
initial proposal '>>=` (and a method), resulting from our
discussion on AIP-47 - system tests refactoring
(<https://lists.apache.org/thread/htd4013yn483qfhwv11vc26jpf2yvjph>).
The ">>=" operator is not really 100% necessary to complete the
AIP-47 (we have ways around it with somewhat complex-ish
pre-commits) but it might simplify the way how we approach the
example dags turned into system tests and make them more
maintainable - but also it might simplify "real" DAGS for some
users.
Context:
In the AIP-47 we have a need for "status" like functionality in
system tests. The basic idea is to encompass all "test code" in a
single file - example dag. This way each system test (which are
currently spread among DAG files, pytest tests and configuration
will "shrink" to the single "example_dag" file. This is a great
simplification and it will be extremely helpful in system tests but
it means that we need a "status" like functionality in the DAGs
that will fail in case any of the tasks failed during the execution.
In a number of DAGs we have to do "cleanup/tearDown" as the last
step no matter if any of the tasks failed (and this can be easily
done with "leaf" all_done rule) but then such a cleanup operation
determines the status of the DAG ("failed" succeeded". So we need
to by-pass the "cleanup" task and in case of any task failure
execute the "watcher" task (as we named it) as a leaf-node to
determine the status of the whole DAG.
Proposed Solution:
Maybe there is an easier way than what we came up with (happy to
hear it), but the idea we have is to have a "watcher" leaf node
task that has "one_failed" dependency on all the tasks in the
example DAG. This leads to all a bit more complex example dags to
end up with:
[task1, task2, task3, ... ] >> watcher
This is a bit brittle because it is long and you have to remember
to add new tasks added in the example dags in the future (and it's
very easy to miss it as there is no "problem" if you miss a task).
We can automate it in pre-commits for example dags, but we could
easily solve it by adding a new operator, that would add
"one_failed" dependency between all other tasks to the watcher task:
Pseudocode:
I proposed >>= operator but there also should be a "set_" method):
dag >>= watcher
the operator:
for task in dag.task:
if not task == watcher:
task >> watcher
The operator might also be needed in other scenarios - when someone
wants to send notifications when any of the tasks failed
(<https://stackoverflow.com/questions/50959743/airflow-trigger-rule-using-one-failed-cause-dag-failure>)
or (I imagine) when you want to make sure to teardown your whole
infrastructure you set-up in a complex DAG with cases similar to
the case Cloudflare presented last year in the summit
<https://airflowsummit.org/sessions/2021/provision-as-a-service/> .
Some of that can be done with cluster policies etc. But I thought
this might be a much nicer way.
WDYT? Are there any better/simpler ways of solving this problem
that we are not aware of ? Do you think it is "enough" of
justification to add a new method/overloaded operator like that?
Are there any strong "no", or maybe we could agree to it via lazy
consensus if there are some supportive voices and no viable
alternative?
BTW. We are also open to change (or drop) the ">>=" proposed
operator if someone thinks this might be confusing.
J.