ybendana commented on pull request #4291: URL: https://github.com/apache/airflow/pull/4291#issuecomment-677895538
I changed the [TriggerDagRunOperator](https://github.com/ybendana/incubator-airflow/blob/dagrunsensor-sqsh2/airflow/operators/dagrun_operator.py) to save the run_id with `self.run_id = dro.run_id`. Pushing the run_id to xcom is a good option if you use the TriggerDagRunOperator in a dag. I call it in the PythonOperator so I can loop over it. When I call TriggerDagRunOperator I save the run_id in a list and return it. I usually do this in a loop to trigger child dags. ``` def triggerchild(): tdr = triggerdag('childdag') return [tdr.run_id] t0 = PythonOperator(python_callable=triggerchild, task_id='trigger_child') t1 = TriggeredDagRunSensor( task_id='sense_child', trigger_task_id='trigger_child', timeout=15, poke_interval=1, ) ``` Here is my triggerdag function: ``` from dagrun_operator import TriggerDagRunOperator def set_up_dag_run(context, dag_run_obj): # This will be available in target dag context as kwargs['dag_run'].conf dag_run_obj.payload = context dag_run_obj.run_id = context.get('dagrunid', dag_run_obj.run_id) return dag_run_obj def triggerdag(dag_id, params={}, owner='airflow'): """Create a task to trigger a dag. Parameters: dag_id [str]: Dag to trigger params [dict]: Dagrun conf parameters. The DagRun id will be set if 'dagrunid' is present. owner [str]: Dag owner Returns: TriggerDagRunOperator: new task """ task_id = 'trigger_{}'.format(dag_id) dagrun = TriggerDagRunOperator( trigger_dag_id=dag_id, python_callable=set_up_dag_run, task_id=task_id, owner=owner) dagrun.execute(params) return dagrun ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
