[ https://issues.apache.org/jira/browse/AIRFLOW-140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15291446#comment-15291446 ]
Chris Riccomini commented on AIRFLOW-140: ----------------------------------------- If so, try the LocalExecutor, and see if you see the same behavior. > DagRun state not updated > ------------------------ > > Key: AIRFLOW-140 > URL: https://issues.apache.org/jira/browse/AIRFLOW-140 > Project: Apache Airflow > Issue Type: Bug > Environment: Airflow latest Git version > Reporter: dud > Priority: Minor > > Hello > I've noticed a strange behaviour : when launching a DAG whose task execution > duration is alternatingly slower and longer, DagRun state is only updated if > all previous DagRuns have ended. > Here is DAG that can trigger this behaviour : > {code} > from airflow import DAG > from airflow.operators import * > from datetime import datetime, timedelta > from time import sleep > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 19, 10, 15), > 'end_date': datetime(2016, 5, 19, 10, 20), > } > dag = DAG('dagrun_not_updated', default_args=default_args, > schedule_interval=timedelta(minutes=1)) > def alternating_sleep(**kwargs): > minute = kwargs['execution_date'].strftime("%M") > is_odd = int(minute) % 2 > if is_odd: > sleep(300) > else: > sleep(10) > return True > PythonOperator( > task_id='alt_sleep', > python_callable=alternating_sleep, > provide_context=True, > dag=dag) > {code} > When this operator is executed, being run at an even minute makes the TI runs > faster than an odd one. > I'm observing the following behaviour : > - after some time, the second DagRun is still i running state despites it has > ended for a while : > {code} > airflow=> SELECT * FROM task_instance WHERE dag_id = :dag_id ORDER BY > execution_date ; SELECT * FROM dag_run WHERE dag_id = :dag_id ; > task_id | dag_id | execution_date | start_date > | end_date | duration | state | try_number | > hostname | unixname | job_id | pool | queue | priority_weight | > operator | queued_dttm > ----------+---------------+---------------------+----------------------------+----------------------------+-----------+---------+------------+-----------+----------+--------+------+---------+-----------------+----------------+------------- > alt_sleep | dagrun_not_updated | 2016-05-19 10:15:00 | 2016-05-19 > 10:17:19.039565 | | | running | > 1 | localhost | airflow | 3196 | | default | 1 | > PythonOperator | > alt_sleep | dagrun_not_updated | 2016-05-19 10:16:00 | 2016-05-19 > 10:17:23.698928 | 2016-05-19 10:17:33.823066 | 10.124138 | success | > 1 | localhost | airflow | 3197 | | default | 1 | > PythonOperator | > alt_sleep | dagrun_not_updated | 2016-05-19 10:17:00 | 2016-05-19 > 10:18:03.025546 | | | running | > 1 | localhost | airflow | 3198 | | default | 1 | > PythonOperator | > (3 rows) > id | dag_id | execution_date | state | > run_id | external_trigger | conf | end_date | start_date > > ------+---------------+---------------------+---------+--------------------------------+------------------+------+----------+---------------------------- > 1479 | dagrun_not_updated | 2016-05-19 10:15:00 | running | > scheduled__2016-05-19T10:15:00 | f | | | > 2016-05-19 10:17:06.563842 > 1480 | dagrun_not_updated | 2016-05-19 10:16:00 | running | > scheduled__2016-05-19T10:16:00 | f | | | > 2016-05-19 10:17:12.188781 > 1481 | dagrun_not_updated | 2016-05-19 10:17:00 | running | > scheduled__2016-05-19T10:17:00 | f | | | > 2016-05-19 10:18:01.550625 > (3 rows) > {code} > - afer some time, all reportedly still running DagRuns are being marked as > successful at the same time : > {code} > 2016-05-19 10:23:11 UTC [12073-18] airflow@airflow LOG: duration: 0.168 ms > statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1479 > 2016-05-19 10:23:11 UTC [12073-19] airflow@airflow LOG: duration: 0.106 ms > statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1480 > 2016-05-19 10:23:11 UTC [12073-20] airflow@airflow LOG: duration: 0.083 ms > statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1481 > 2016-05-19 10:23:11 UTC [12073-21] airflow@airflow LOG: duration: 0.081 ms > statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1482 > {code} > So it waited till the 4th DagRun ended to update the dag_run table. > I've looked at the code I'm not sure whether the issue lies in Airflow as the > scheduler properly runs the code that updates the state to sucess : > {code} > May 19 10:17:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:17:36,542] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:17:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:17:41,666] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:17:51 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:17:51,571] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:17:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:17:56,578] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:01,591] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:06 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:06,735] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:16 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:16,599] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:21,623] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:31,651] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:41,611] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:46 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:46,625] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:18:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:18:56,619] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:01,640] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:07 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:07,355] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:16 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:16,633] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:21,710] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:21,711] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:19:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:31,646] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:31,647] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:19:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:36,650] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:36,651] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:19:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:41,656] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:41,657] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:19:51 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:51,659] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:51 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:51,659] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:19:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:56,664] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:19:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:19:56,664] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:20:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:01,670] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:20:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:01,671] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:20:06 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:06,669] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:20:06 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:06,674] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:20:11 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:11,739] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:20:11 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:11,739] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:20:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:21,726] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:20:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:21,727] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:20:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:31,699] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:20:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:31,699] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > May 19 10:20:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:36,700] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: > False> successful > May 19 10:20:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 > 10:20:36,700] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated > @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: > False> successful > {code} > I've also verified that the scheduler runs session.commit(). But for some > reason this doesn't trigger any database sync. > Please note that I have the following parameters in my configuration that may > be related with the behaviour reported above : > {code} > parallelism = 4 > max_active_runs_per_dag = 4 > {code} > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)