ybendana commented on pull request #4291:
URL: https://github.com/apache/airflow/pull/4291#issuecomment-677895538


   I changed the 
[TriggerDagRunOperator](https://github.com/ybendana/incubator-airflow/blob/dagrunsensor-sqsh2/airflow/operators/dagrun_operator.py)
 to save the run_id with `self.run_id = dro.run_id`.  Pushing the run_id to 
xcom is a good option if you use the TriggerDagRunOperator in a dag.  I call it 
in the PythonOperator so I can loop over it.
   
   When I call TriggerDagRunOperator I save the run_id in a list and return it. 
 I usually do this in a loop to trigger child dags.
   
   ```
   def triggerchild():
           tdr = triggerdag('childdag')
           return [tdr.run_id]
   t0 = PythonOperator(python_callable=triggerchild,
                           task_id='trigger_child')
   t1 = TriggeredDagRunSensor(
           task_id='sense_child',
           trigger_task_id='trigger_child',
           timeout=15,
           poke_interval=1,
       )
   ```
   
   Here is my triggerdag function:
   
   ```
   from dagrun_operator import TriggerDagRunOperator
   
   
   def set_up_dag_run(context, dag_run_obj):                                    
                                                                                
                                                                                
    
       # This will be available in target dag context as kwargs['dag_run'].conf 
                                                                                
                                                                                
    
       dag_run_obj.payload = context                                            
                                                                                
                                                                                
    
       dag_run_obj.run_id = context.get('dagrunid', dag_run_obj.run_id)         
                                                                                
                                                                                
    
       return dag_run_obj                                                       
                                                                                
                                                                                
   
   def triggerdag(dag_id, params={}, owner='airflow'):
       """Create a task to trigger a dag.
   
       Parameters:
           dag_id [str]: Dag to trigger
           params [dict]: Dagrun conf parameters.  The DagRun id will be set if 
'dagrunid' is present.
           owner [str]: Dag owner
   
       Returns:
           TriggerDagRunOperator: new task
       """
       task_id = 'trigger_{}'.format(dag_id)
       dagrun = TriggerDagRunOperator(
           trigger_dag_id=dag_id, python_callable=set_up_dag_run,
           task_id=task_id, owner=owner)
       dagrun.execute(params)
       return dagrun
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to