[ 
https://issues.apache.org/jira/browse/AIRFLOW-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qian Yu updated AIRFLOW-5648:
-----------------------------
    Labels: ClearTaskOperator airflow clear duplicate operator re-run rerun 
task  (was: airflow clear duplicate operator re-run rerun task)

> Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG
> --------------------------------------------------------------------------
>
>                 Key: AIRFLOW-5648
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5648
>             Project: Apache Airflow
>          Issue Type: New Feature
>          Components: operators
>    Affects Versions: 1.10.5
>            Reporter: Qian Yu
>            Priority: Major
>              Labels: ClearTaskOperator, airflow, clear, duplicate, operator, 
> re-run, rerun, task
>
> There are use cases where some external conditions have changed and a section 
> of the DAG needs to be re-run (after they have already finished previously). 
> Here's such an example I recently encountered:
> We have a DAG that runs the following in the morning for execution_date T. 
> The preliminary result of task J is needed at 9am in the morning:
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> Finish
>                ^
>                |
> B >> D >> F>>>>>
> {code}
> And later on in the afternoon at 3pm, some external condition changes 
> (indicated by a Sensor). At that point, we need to re-run task A and all its 
> downstream tasks to reflect the possible changes. The new results of J is 
> needed in the evening.
> One possible approach is to make the DAG look like this by duplicating the 
> section that needs to be re-run. In the following image, A1 runs the same 
> command as A, C1 runs the same command as C, etc. This mostly works, but it 
> causes the DAG to look unnecessarily large. It also causes code duplication 
> because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the 
> original tasks. They are duplicated because we need to re-run the same task 
> in the afternoon. In this simplified example, the duplication does not look 
> too bad, but in the real examples I faced, task A has many downstream tasks 
> with complex dependencies. Copying all of them is more difficult. Also, 
> because of these duplication, the next time someone updates the DAG and 
> inserts a new task in the middle of E and G, it'll be hard to remember to add 
> it in between E1 and G1 as well.
> {code:java}
> A >> C >> E >> G >> H >> I >>  J >>>>>>>>> Finish
>                ^                              ^
>                |                              |
> B >> D >> F>>>>                               |
>                                               |
> Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1
> {code}
> Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator. 
> This operator takes an external_task_id as its parameter. When 
> ClearTaskOperator runs, it clears the state of the given external_task_id and 
> all its downstream tasks. This will cause them to re-run. So the problem I'm 
> facing can be tackled without duplicating all those tasks. With 
> ClearTaskOperator, the DAG can look like this. 
> {code:java}
> A >> C >> E >> G >> H >> I >>  J >>>>>>>>>> Finish
>                ^                              ^
>                |                              |
> B >> D >> F>>>>                               |
>                                               |
> Sensor >> Clear_Task_A >>>>>>>>>>>>>>>>>>>>>>>
> {code}
> In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When 
> Clear_Task_A executes, it clears task A and all its downstream tasks.
> {code:python}
> Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A", 
> external_task_id="A") {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to