[
https://issues.apache.org/jira/browse/AIRFLOW-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16957613#comment-16957613
]
ASF GitHub Bot commented on AIRFLOW-5648:
-----------------------------------------
yuqian90 commented on pull request #6392: [AIRFLOW-5648] Add ClearTaskOperator
for clearing tasks in a DAG
URL: https://github.com/apache/airflow/pull/6392
Make sure you have checked _all_ steps below.
### Jira
- [ ] My PR addresses the following [Airflow
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
- https://issues.apache.org/jira/browse/AIRFLOW-5648
### Description
- [ ] Here are some details about my PR, including screenshots of any UI
changes:
- ClearTaskOperator is modeled after the "Clear" button in the Web UI. It
can be used to clear a given external task, causing it to be re-run by the
scheduler.
- Workflows that require rerunning a chain of completed tasks at a later
stage can be easily constructed with ClearTaskOperator.
- Real world examples I've seen include:
- Re-running a given daily task (and its downstream tasks) for the whole
month after some monthly statistics have been generated on the last day of the
month
- Re-running a chain of tasks that finished in the morning after
additional data has been retrieved in the evening
- An illustrated example can be found in the JIRA description:
https://issues.apache.org/jira/browse/AIRFLOW-5648
### Tests
- [ ] My PR adds the following unit tests __OR__ does not need testing for
this extremely good reason:
### Commits
- [ ] My commits all reference Jira issues in their subject lines, and I
have squashed multiple commits if they address the same issue. In addition, my
commits follow the guidelines from "[How to write a good git commit
message](http://chris.beams.io/posts/git-commit/)":
1. Subject is separated from body by a blank line
1. Subject is limited to 50 characters (not including Jira issue reference)
1. Subject does not end with a period
1. Subject uses the imperative mood ("add", not "adding")
1. Body wraps at 72 characters
1. Body explains "what" and "why", not "how"
### Documentation
- [ ] In case of new functionality, my PR adds documentation that describes
how to use it.
- All the public functions and the classes in the PR contain docstrings
that explain what it does
- If you implement backwards incompatible changes, please leave a note in
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so
we can assign it to a appropriate release
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add ClearTaskOperator to allow clearing/re-running tasks from within a DAG
> --------------------------------------------------------------------------
>
> Key: AIRFLOW-5648
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5648
> Project: Apache Airflow
> Issue Type: New Feature
> Components: operators
> Affects Versions: 1.10.5
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
> Labels: ClearTaskOperator, airflow, clear, duplicate, operator,
> re-run, rerun, task
>
> There are use cases where some external conditions have changed and a section
> of the DAG needs to be re-run (after they have already finished previously).
> Here's such an example I recently encountered:
> We have a DAG that runs the following in the morning for execution_date T.
> The preliminary result of task J is needed at 9am in the morning. K, L and M
> needs to wait for Sensor to pass so they are not done till much later in the
> evening:
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |
> B >> D >> F>>>>> Sensor
> {code}
> Later on in the afternoon at 3pm, some external condition changes (indicated
> by Sensor). At that point, we need to re-run task A and all its downstream
> tasks that have already run (i.e. A, C, E, G, H, I, J) to reflect possible
> changes. Other finished tasks such as B, D, F do not need to be re-run. The
> new results of J is needed in the evening by downstream tasks K, L, M that
> have been waiting.
> One possible approach is to make the DAG look like this by duplicating the
> section that needs to be re-run. In the following image, A1 runs the same
> command as A, C1 runs the same command as C, etc. This mostly works, but it
> causes the DAG to look unnecessarily large. It also causes code duplication
> because the tasks A1, C1, E1, G1, H1, I1, J1 are all identical to the
> original tasks. In this simplified example, the duplication does not look too
> bad, but in the real examples I faced, task A has many downstream tasks with
> complex dependencies. Copying all of them is more difficult. Because of these
> duplication, the next time someone updates the DAG and inserts a new task in
> the middle of E and G, it'll be hard to remember to add it in between E1 and
> G1 as well.
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |__________
> B >> D >> F>>>> |
> |
> Sensor >> A1 >> C1 >> E1 >> G1 >> H1 >> I1 >> J1
> {code}
> Instead of duplicating the tasks, I'm proposing adding a ClearTaskOperator.
> This operator takes an external_task_id as its parameter. When
> ClearTaskOperator runs, it clears the state of the given external_task_id and
> all its downstream tasks. This will cause them to re-run. So the problem I'm
> facing can be tackled without duplicating all those tasks. With
> ClearTaskOperator, the DAG can look like this.
> {code:java}
> A >> C >> E >> G >> H >> I >> J >> K >> L >> M >> Finish
> ^ ^
> | |
> B >> D >> F>>>> |
> |
> Sensor >> Clear_Task_A >>>>>>>>>>>>>
> {code}
> In the above DAG, Clear_Task_A is a ClearTaskOperator defined like this. When
> Clear_Task_A executes, it clears task A and all its downstream tasks (so in
> this case it causes A, C, E, G, H, I, J to be cleared and re-run).
> {code:python}
> Clear_Task_A = ClearTaskOperator(task_id="Clear_Task_A",
> external_task_id="A") {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)