[
https://issues.apache.org/jira/browse/AIRFLOW-1488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712260#comment-16712260
]
ASF GitHub Bot commented on AIRFLOW-1488:
-----------------------------------------
ybendana opened a new pull request #4291: [AIRFLOW-1488] Add the DagRunSensor
operator
URL: https://github.com/apache/incubator-airflow/pull/4291
ExternalTaskSensor allows one to wait on any valid combination of
(dag_id, task_id). It is desirable though to be able to wait on entire
DagRuns, as opposed to specific task instances in those DAGs.
This pull request adds a new sensor in contrib called DagRunSensor.
This version is a different approach from previous pull requests
addressing the same issue. In this case, the DagRunSensor takes a
trigger_task_id, the id of a task that triggers DagRuns. The trigger
task returns a list of run_ids of the DagRuns it triggered and the
DagRunSensor polls their status. For this purpose the
TriggerDagRunOperator was modified so that it stores the run_id of the
triggered DagRun.
Make sure you have checked _all_ steps below.
### Jira
- [ ] My PR addresses the following [Airflow
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
- https://issues.apache.org/jira/browse/AIRFLOW-XXX
- In case you are fixing a typo in the documentation you can prepend your
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
### Description
- [ ] Here are some details about my PR, including screenshots of any UI
changes:
See the above description. I also copied quite a bit from the previous pull
requests on this issue #2500 and #3234. Unlike those attempts, the focus here
is on triggered DagRuns. I realize it may not be generally applicable to
everyone's workflows but it has been very useful for us. At one point I
considered using subdags but since they have had their issues I think this a
good alternative.
### Tests
- [ ] My PR adds the following unit tests __OR__ does not need testing for
this extremely good reason:
I included test_dagrun_sensor.py and test_dagrun_sensor_dag.py. Currently,
I'm running into an issue executing the test in the Docker environment because
the sensor task has to wait on the triggered DagRun. The DagRun is triggered
but is never scheduled. I think this is because the unit test uses the
SequentialExecutor and SQLite. If I run the scheduler manually, the triggered
DagRun executes. I'm not sure what to do about this. For now, I've commented
the successful DagRun test and have only the failed DagRun test active.
### Commits
- [ ] My commits all reference Jira issues in their subject lines, and I
have squashed multiple commits if they address the same issue. In addition, my
commits follow the guidelines from "[How to write a good git commit
message](http://chris.beams.io/posts/git-commit/)":
1. Subject is separated from body by a blank line
1. Subject is limited to 50 characters (not including Jira issue reference)
1. Subject does not end with a period
1. Subject uses the imperative mood ("add", not "adding")
1. Body wraps at 72 characters
1. Body explains "what" and "why", not "how"
### Documentation
- [ ] In case of new functionality, my PR adds documentation that describes
how to use it.
- When adding new operators/hooks/sensors, the autoclass documentation
generation needs to be added.
- All the public functions and the classes in the PR contain docstrings
that explain what it does
### Code Quality
- [ ] Passes `flake8`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add a sensor operator to wait on DagRuns
> ----------------------------------------
>
> Key: AIRFLOW-1488
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1488
> Project: Apache Airflow
> Issue Type: New Feature
> Components: contrib, operators
> Reporter: Yati
> Assignee: Yati
> Priority: Major
>
> The
> [ExternalTaskSensor|https://airflow.incubator.apache.org/code.html#airflow.operators.ExternalTaskSensor]
> operator already allows for encoding dependencies on tasks in external DAGs.
> However, when you have teams, each owning multiple small-to-medium sized
> DAGs, it is desirable to be able to wait on an external DagRun as a whole.
> This allows the owners of an upstream DAG to refactor their code freely by
> splitting/squashing task responsibilities, without worrying about dependent
> DAGs breaking.
> I'll now enumerate the easiest ways of achieving this that come to mind:
> * Make all DAGs always have a join DummyOperator in the end, with a task id
> that follows some convention, e.g., "{{ dag_id }}.__end__".
> * Make ExternalTaskSensor poke for a DagRun instead of TaskInstances when the
> external_task_id argument is None.
> * Implement a separate DagRunSensor operator.
> After considerations, we decided to implement a separate operator, which
> we've been using in the team for our workflows, and I think it would make a
> good addition to contrib.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)