[ https://issues.apache.org/jira/browse/AIRFLOW-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286604#comment-15286604 ]
Hila Visan edited comment on AIRFLOW-47 at 5/17/16 2:57 PM: ------------------------------------------------------------ Hi, Looks like adding _'priority_weight'_ to DAG arguments solves the problem. It causes the scheduler to send to queue the hourly jobs before the daily jobs. [~jlowin] I'll appreciate if you'll provide a working example of using xcom as you described. I have the hourly DAG in the description, and I should add: _'xcom_push=True'_ (or use another PythonOpertaor). The Daily DAG runs once a day, *how can I ensure (in the daily DAG) that all hourly tasks of previous day were executed?* If I understand correctly, the xcom_pull() receives the last pushed value. Thanks was (Author: hilaviz): Hi, Looks like adding _'priority_weight'_ to DAG arguments solves the problem. It causes the scheduler to send to queue the hourly jobs before the daily jobs. [~jlowin] I'll appreciate if you'll provide a working example of using xcom as you described. I have the hourly DAG in the description, and I should add _'xcom_push=True'_ . How can I ensure (in the daily DAG) that all hourly tasks of previous day were executed? > ExternalTaskSensor causes scheduling dead lock > ---------------------------------------------- > > Key: AIRFLOW-47 > URL: https://issues.apache.org/jira/browse/AIRFLOW-47 > Project: Apache Airflow > Issue Type: Bug > Components: operators, scheduler > Affects Versions: Airflow 1.7.0 > Environment: CentOS 6.5 > Airflow 1.7.0 with SequentialExecuter > Reporter: Hila Visan > Attachments: screenshot-1.png > > > We are trying to use 'ExternalTaskSensor' to coordinate between a daily DAG > and an hourly DAG (daily dags depend on hourly). > Relevant code: > *Daily DAG definition:* > {code:title=2_daily_dag.py|borderStyle=solid} > default_args = { > … > 'start_date': datetime(2016, 4, 2), > … > } > dag = DAG(dag_id='2_daily_agg', default_args=default_args, > schedule_interval="@daily") > ext_dep = ExternalTaskSensor( > external_dag_id='1_hourly_agg', > external_task_id='print_hourly1', > task_id='evening_hours_sensor', > dag=dag) > {code} > *Hourly DAG definition:* > {code:title=1_hourly_dag.py|borderStyle=solid} > default_args = { > … > 'start_date': datetime(2016, 4, 1), > … > } > dag = DAG(dag_id='1_hourly_agg', default_args=default_args, > schedule_interval="@hourly") > t1 = BashOperator( > task_id='print_hourly1', > bash_command='echo hourly job1', > dag=dag) > {code} > The hourly dag was executed twice for the following execution dates: > 04-01T00:00:00 > 04-01T01:00:00 > Then the daily dag was executed, and is still running.... > According to logs, daily dag is waiting for hourly dag to complete: > {noformat} > [2016-05-04 06:01:20,978] {models.py:1041} INFO - > Executing<Task(ExternalTaskSensor): evening_hours_sensor> on 2016-04-03 > 00:00:00 > [2016-05-04 06:01:20,984] {sensors.py:188} INFO - Poking for > 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... > [2016-05-04 06:02:21,053] {sensors.py:188} INFO - Poking for > 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... }} > {noformat} > How can I solve this dead-lock? > In Addition- I didn't understand if it means that the daily dag depends only > on the "last" hourly dag of the same day (23-24pm)? > What happens if the hourly dag of other hour fails? > Thanks a lot! -- This message was sent by Atlassian JIRA (v6.3.4#6332)