[
https://issues.apache.org/jira/browse/AIRFLOW-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qian Yu updated AIRFLOW-5648:
-----------------------------
Description:
There are use cases where some external conditions have changed and a section
of the DAG needs to be re-run (after they have already finished previously).
Here's such an example I recently encountered:
We have a DAG that runs the following in the morning for execution_date T. The
preliminary result of task J is needed at 9am in the morning. K, L and M needs
to wait for Sensor to pass so they are not done till much later in the evening:
{code:java}
A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
^ ^
| |
B >> D >> F>>>>> Sensor
{code}
Later on in the afternoon at 3pm, some external condition changes (indicated by
Sensor). At that point, we need to re-run task A and all its downstream tasks
that have already run (i.e. A, C, E, G, H, I, J) to reflect possible changes.
Other finished tasks such as B, D, F do not need to be re-run. The new results
of J is needed in the evening by downstream tasks K, L, M that have been
waiting.
One possible approach is to make the DAG look like this by duplicating the
section that needs to be re-run. In the following image, A1 runs the same
command as A, C1 runs the same command as C, etc. This mostly works, but it
causes the DAG to look unnecessarily large. It also causes code duplication
because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the original
tasks. In this simplified example, the duplication does not look too bad, but
in the real examples I faced, task A has many downstream tasks with complex
dependencies. Copying all of them is more difficult. Because of these
duplication, the next time someone updates the DAG and inserts a new task in
the middle of E and G, it'll be hard to remember to add it in between E1 and G1
as well.
{code:java}
A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
^ ^
| |__________
B >> D >> F>>>> |
|
Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1
{code}
Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator.
This operator takes an external_task_id as its parameter. When
ClearTaskOperator runs, it clears the state of the given external_task_id and
all its downstream tasks. This will cause them to re-run. So the problem I'm
facing can be tackled without duplicating all those tasks. With
ClearTaskOperator, the DAG can look like this.
{code:java}
A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
^ ^
| |__________
B >> D >> F>>>> |
|
Sensor >> Clear_Task_A >>>>>>>>>>>>>>>>>>>>>>>
{code}
In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When
Clear_Task_A executes, it clears task A and all its downstream tasks (so in
this case it causes A, C, E, G, H, I, J to be cleared and re-run).
{code:python}
Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A", external_task_id="A")
{code}
was:
There are use cases where some external conditions have changed and a section
of the DAG needs to be re-run (after they have already finished previously).
Here's such an example I recently encountered:
We have a DAG that runs the following in the morning for execution_date T. The
preliminary result of task J is needed at 9am in the morning:
{code:java}
A >> C >> E >> G >> H >> I >> J >> Finish
^
|
B >> D >> F>>>>>
{code}
And later on in the afternoon at 3pm, some external condition changes
(indicated by a Sensor). At that point, we need to re-run task A and all its
downstream tasks to reflect the possible changes. The new results of J is
needed in the evening.
One possible approach is to make the DAG look like this by duplicating the
section that needs to be re-run. In the following image, A1 runs the same
command as A, C1 runs the same command as C, etc. This mostly works, but it
causes the DAG to look unnecessarily large. It also causes code duplication
because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the original
tasks. They are duplicated because we need to re-run the same task in the
afternoon. In this simplified example, the duplication does not look too bad,
but in the real examples I faced, task A has many downstream tasks with complex
dependencies. Copying all of them is more difficult. Also, because of these
duplication, the next time someone updates the DAG and inserts a new task in
the middle of E and G, it'll be hard to remember to add it in between E1 and G1
as well.
{code:java}
A >> C >> E >> G >> H >> I >> J >>>>>>>>> Finish
^ ^
| |
B >> D >> F>>>> |
|
Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1
{code}
Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator.
This operator takes an external_task_id as its parameter. When
ClearTaskOperator runs, it clears the state of the given external_task_id and
all its downstream tasks. This will cause them to re-run. So the problem I'm
facing can be tackled without duplicating all those tasks. With
ClearTaskOperator, the DAG can look like this.
{code:java}
A >> C >> E >> G >> H >> I >> J >>>>>>>>>> Finish
^ ^
| |
B >> D >> F>>>> |
|
Sensor >> Clear_Task_A >>>>>>>>>>>>>>>>>>>>>>>
{code}
In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When
Clear_Task_A executes, it clears task A and all its downstream tasks.
{code:python}
Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A", external_task_id="A")
{code}
> Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG
> --------------------------------------------------------------------------
>
> Key: AIRFLOW-5648
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5648
> Project: Apache Airflow
> Issue Type: New Feature
> Components: operators
> Affects Versions: 1.10.5
> Reporter: Qian Yu
> Priority: Major
> Labels: ClearTaskOperator, airflow, clear, duplicate, operator,
> re-run, rerun, task
>
> There are use cases where some external conditions have changed and a section
> of the DAG needs to be re-run (after they have already finished previously).
> Here's such an example I recently encountered:
> We have a DAG that runs the following in the morning for execution_date T.
> The preliminary result of task J is needed at 9am in the morning. K, L and M
> needs to wait for Sensor to pass so they are not done till much later in the
> evening:
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |
> B >> D >> F>>>>> Sensor
> {code}
> Later on in the afternoon at 3pm, some external condition changes (indicated
> by Sensor). At that point, we need to re-run task A and all its downstream
> tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possible
> changes. Other finished tasks such as B, D, F do not need to be re-run. The
> new results of J is needed in the evening by downstream tasks K, L, M that
> have been waiting.
> One possible approach is to make the DAG look like this by duplicating the
> section that needs to be re-run. In the following image, A1 runs the same
> command as A, C1 runs the same command as C, etc. This mostly works, but it
> causes the DAG to look unnecessarily large. It also causes code duplication
> because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the
> original tasks. In this simplified example, the duplication does not look too
> bad, but in the real examples I faced, task A has many downstream tasks with
> complex dependencies. Copying all of them is more difficult. Because of these
> duplication, the next time someone updates the DAG and inserts a new task in
> the middle of E and G, it'll be hard to remember to add it in between E1 and
> G1 as well.
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |__________
> B >> D >> F>>>> |
> |
> Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1
> {code}
> Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator.
> This operator takes an external_task_id as its parameter. When
> ClearTaskOperator runs, it clears the state of the given external_task_id and
> all its downstream tasks. This will cause them to re-run. So the problem I'm
> facing can be tackled without duplicating all those tasks. With
> ClearTaskOperator, the DAG can look like this.
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |__________
> B >> D >> F>>>> |
> |
> Sensor >> Clear_Task_A >>>>>>>>>>>>>>>>>>>>>>>
> {code}
> In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When
> Clear_Task_A executes, it clears task A and all its downstream tasks (so in
> this case it causes A, C, E, G, H, I, J to be cleared and re-run).
> {code:python}
> Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A",
> external_task_id="A") {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)