[ 
https://issues.apache.org/jira/browse/AIRFLOW-140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated AIRFLOW-140:
------------------------------------
    Component/s: scheduler

> DagRun state not updated
> ------------------------
>
>                 Key: AIRFLOW-140
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-140
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>         Environment: Airflow latest Git version
>            Reporter: dud
>            Priority: Minor
>
> Hello
> I've noticed a strange behaviour : when launching a DAG whose task execution 
> duration is alternatingly slower and longer, DagRun state is only updated if 
> all previous DagRuns have ended.
> Here is DAG that can trigger this behaviour :
> {code}
> from airflow import DAG
> from airflow.operators import *
> from datetime import datetime, timedelta
> from time import sleep
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 5, 19, 10, 15),
>     'end_date': datetime(2016, 5, 19, 10, 20),
> }
> dag = DAG('dagrun_not_updated', default_args=default_args, 
> schedule_interval=timedelta(minutes=1))
> def alternating_sleep(**kwargs):
>     minute = kwargs['execution_date'].strftime("%M")
>     is_odd = int(minute) % 2
>     if is_odd:
>         sleep(300)
>     else:
>         sleep(10)
>     return True
> PythonOperator(
>     task_id='alt_sleep',
>     python_callable=alternating_sleep,
>     provide_context=True,
>     dag=dag)
> {code}
> When this operator is executed, being run at an even minute makes the TI runs 
> faster than an odd one.
> I'm observing the following behaviour :
> - after some time, the second DagRun is still i running state despites it has 
> ended for a while :
> {code}
> airflow=> SELECT * FROM task_instance WHERE dag_id = :dag_id ORDER BY 
> execution_date ;  SELECT * FROM dag_run WHERE dag_id = :dag_id ;
>   task_id  |       dag_id       |   execution_date    |         start_date    
>      |          end_date          | duration  |  state  | try_number | 
> hostname  | unixname | job_id | pool |  queue  | priority_weight |    
> operator    | queued_dttm
> ----------+---------------+---------------------+----------------------------+----------------------------+-----------+---------+------------+-----------+----------+--------+------+---------+-----------------+----------------+-------------
>  alt_sleep | dagrun_not_updated | 2016-05-19 10:15:00 | 2016-05-19 
> 10:17:19.039565 |                            |           | running |          
> 1 | localhost | airflow  |   3196 |      | default |               1 | 
> PythonOperator |
>  alt_sleep | dagrun_not_updated | 2016-05-19 10:16:00 | 2016-05-19 
> 10:17:23.698928 | 2016-05-19 10:17:33.823066 | 10.124138 | success |          
> 1 | localhost | airflow  |   3197 |      | default |               1 | 
> PythonOperator |
>  alt_sleep | dagrun_not_updated | 2016-05-19 10:17:00 | 2016-05-19 
> 10:18:03.025546 |                            |           | running |          
> 1 | localhost | airflow  |   3198 |      | default |               1 | 
> PythonOperator |
> (3 rows)
>   id  |       dag_id       |   execution_date    |  state  |             
> run_id             | external_trigger | conf | end_date |         start_date  
>   
> ------+---------------+---------------------+---------+--------------------------------+------------------+------+----------+----------------------------
>  1479 | dagrun_not_updated | 2016-05-19 10:15:00 | running | 
> scheduled__2016-05-19T10:15:00 | f                |      |          | 
> 2016-05-19 10:17:06.563842
>  1480 | dagrun_not_updated | 2016-05-19 10:16:00 | running | 
> scheduled__2016-05-19T10:16:00 | f                |      |          | 
> 2016-05-19 10:17:12.188781
>  1481 | dagrun_not_updated | 2016-05-19 10:17:00 | running | 
> scheduled__2016-05-19T10:17:00 | f                |      |          | 
> 2016-05-19 10:18:01.550625
> (3 rows)
> {code}
> - afer some time, all reportedly still running DagRuns are being marked as 
> successful at the same time :
> {code}
> 2016-05-19 10:23:11 UTC [12073-18] airflow@airflow LOG:  duration: 0.168 ms  
> statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1479
> 2016-05-19 10:23:11 UTC [12073-19] airflow@airflow LOG:  duration: 0.106 ms  
> statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1480
> 2016-05-19 10:23:11 UTC [12073-20] airflow@airflow LOG:  duration: 0.083 ms  
> statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1481
> 2016-05-19 10:23:11 UTC [12073-21] airflow@airflow LOG:  duration: 0.081 ms  
> statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1482
> {code}
> So it waited till the 4th DagRun ended to update the dag_run table.
> I've looked at the code I'm not sure whether the issue lies in Airflow as the 
> scheduler properly runs the code that updates the state to sucess :
> {code}
> May 19 10:17:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:17:36,542] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:17:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:17:41,666] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:17:51 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:17:51,571] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:17:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:17:56,578] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:01,591] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:06 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:06,735] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:16 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:16,599] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:21,623] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:31,651] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:41,611] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:46 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:46,625] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:18:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:18:56,619] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:01,640] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:07 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:07,355] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:16 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:16,633] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:21,710] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:21,711] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:19:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:31,646] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:31,647] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:19:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:36,650] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:36,651] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:19:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:41,656] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:41 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:41,657] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:19:51 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:51,659] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:51 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:51,659] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:19:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:56,664] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:19:56 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:19:56,664] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:20:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:01,670] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:20:01 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:01,671] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:20:06 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:06,669] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:20:06 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:06,674] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:20:11 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:11,739] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:20:11 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:11,739] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:20:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:21,726] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:20:21 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:21,727] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:20:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:31,699] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:20:31 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:31,699] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> May 19 10:20:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:36,700] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:16:00: scheduled__2016-05-19T10:16:00, externally triggered: 
> False> successful
> May 19 10:20:36 airflow-ec2 airflow-scheduler[11543]: [2016-05-19 
> 10:20:36,700] {models.py:2725} INFO - Marking run <DagRun dagrun_not_updated 
> @ 2016-05-19 10:18:00: scheduled__2016-05-19T10:18:00, externally triggered: 
> False> successful
> {code}
> I've also verified that the scheduler runs session.commit(). But for some 
> reason this doesn't trigger any database sync.
> Please note that I have the following parameters in my configuration that may 
> be related with the behaviour reported above :
> {code}
> parallelism = 4
> max_active_runs_per_dag = 4
> {code}
> dud



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to