[
https://issues.apache.org/jira/browse/AIRFLOW-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qian Yu closed AIRFLOW-5648.
----------------------------
Resolution: Won't Fix
See comments on the closed PR: [https://github.com/apache/airflow/pull/6392]
The issue is addressed with a simpler approach. The other need to clear tasks
across DAGs when using ExternalTaskSensor is addressed by a new PR:
[https://github.com/apache/airflow/pull/6633]
> Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG
> --------------------------------------------------------------------------
>
> Key: AIRFLOW-5648
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5648
> Project: Apache Airflow
> Issue Type: New Feature
> Components: operators
> Affects Versions: 1.10.5
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
> Labels: ClearTaskOperator, airflow, clear, duplicate, operator,
> re-run, rerun, task
>
> There are use cases where some external conditions have changed and a section
> of the DAG needs to be re-run (after they have already finished previously).
> Here's such an example I recently encountered:
> We have a DAG that runs the following in the morning for execution_date T.
> The preliminary result of task J is needed at 9am in the morning. K, L and M
> needs to wait for Sensor to pass so they are not done till much later in the
> evening:
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |
> B >> D >> F>>>>> Sensor
> {code}
> Later on in the afternoon at 3pm, some external condition changes (indicated
> by Sensor). At that point, we need to re-run task A and all its downstream
> tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possible
> changes. Other finished tasks such as B, D, F do not need to be re-run. The
> new results of J is needed in the evening by downstream tasks K, L, M that
> have been waiting.
> One possible approach is to make the DAG look like this by duplicating the
> section that needs to be re-run. In the following image, A1 runs the same
> command as A, C1 runs the same command as C, etc. This mostly works, but it
> causes the DAG to look unnecessarily large. It also causes code duplication
> because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the
> original tasks. In this simplified example, the duplication does not look too
> bad, but in the real examples I faced, task A has many downstream tasks with
> complex dependencies. Copying all of them is more difficult. Because of these
> duplication, the next time someone updates the DAG and inserts a new task in
> the middle of E and G, it'll be hard to remember to add it in between E1 and
> G1 as well.
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |__________
> B >> D >> F>>>> |
> |
> Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1
> {code}
> Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator.
> This operator takes an external_task_id as its parameter. When
> ClearTaskOperator runs, it clears the state of the given external_task_id and
> all its downstream tasks. This will cause them to re-run. So the problem I'm
> facing can be tackled without duplicating all those tasks. With
> ClearTaskOperator, the DAG can look like this.
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |
> B >> D >> F>>>> |
> |
> Sensor >> Clear_Task_A >>>>>>>>>>>>>
> {code}
> In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When
> Clear_Task_A executes, it clears task A and all its downstream tasks (so in
> this case it causes A, C, E, G, H, I, J to be cleared and re-run).
> {code:python}
> Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A",
> external_task_id="A") {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)