[
https://issues.apache.org/jira/browse/AIRFLOW-1488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17181438#comment-17181438
]
ASF GitHub Bot commented on AIRFLOW-1488:
-----------------------------------------
ybendana commented on pull request #4291:
URL: https://github.com/apache/airflow/pull/4291#issuecomment-677895538
I changed the
[TriggerDagRunOperator](https://github.com/ybendana/incubator-airflow/blob/dagrunsensor-sqsh2/airflow/operators/dagrun_operator.py)
to save the run_id with `self.run_id = dro.run_id`. Pushing the run_id to
xcom is a good option if you use the TriggerDagRunOperator in a dag. I call it
in the PythonOperator so I can loop over it.
When I call TriggerDagRunOperator I save the run_id in a list and return it.
I usually do this in a loop to trigger child dags.
```
def triggerchild():
tdr = triggerdag('childdag')
return [tdr.run_id]
t0 = PythonOperator(python_callable=triggerchild,
task_id='trigger_child')
t1 = TriggeredDagRunSensor(
task_id='sense_child',
trigger_task_id='trigger_child',
timeout=15,
poke_interval=1,
)
```
Here is my triggerdag function:
```
from dagrun_operator import TriggerDagRunOperator
def set_up_dag_run(context, dag_run_obj):
# This will be available in target dag context as kwargs['dag_run'].conf
dag_run_obj.payload = context
dag_run_obj.run_id = context.get('dagrunid', dag_run_obj.run_id)
return dag_run_obj
def triggerdag(dag_id, params={}, owner='airflow'):
"""Create a task to trigger a dag.
Parameters:
dag_id [str]: Dag to trigger
params [dict]: Dagrun conf parameters. The DagRun id will be set if
'dagrunid' is present.
owner [str]: Dag owner
Returns:
TriggerDagRunOperator: new task
"""
task_id = 'trigger_{}'.format(dag_id)
dagrun = TriggerDagRunOperator(
trigger_dag_id=dag_id, python_callable=set_up_dag_run,
task_id=task_id, owner=owner)
dagrun.execute(params)
return dagrun
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add a sensor operator to wait on DagRuns
> ----------------------------------------
>
> Key: AIRFLOW-1488
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1488
> Project: Apache Airflow
> Issue Type: New Feature
> Components: contrib, operators
> Reporter: Yati
> Assignee: Yati
> Priority: Major
>
> The
> [ExternalTaskSensor|https://airflow.incubator.apache.org/code.html#airflow.operators.ExternalTaskSensor]
> operator already allows for encoding dependencies on tasks in external DAGs.
> However, when you have teams, each owning multiple small-to-medium sized
> DAGs, it is desirable to be able to wait on an external DagRun as a whole.
> This allows the owners of an upstream DAG to refactor their code freely by
> splitting/squashing task responsibilities, without worrying about dependent
> DAGs breaking.
> I'll now enumerate the easiest ways of achieving this that come to mind:
> * Make all DAGs always have a join DummyOperator in the end, with a task id
> that follows some convention, e.g., "{{ dag_id }}.__end__".
> * Make ExternalTaskSensor poke for a DagRun instead of TaskInstances when the
> external_task_id argument is None.
> * Implement a separate DagRunSensor operator.
> After considerations, we decided to implement a separate operator, which
> we've been using in the team for our workflows, and I think it would make a
> good addition to contrib.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)