[ 
https://issues.apache.org/jira/browse/AIRFLOW-7071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marko Kattelus updated AIRFLOW-7071:
------------------------------------
    Description: 
This is about question about possibility to improvement of queuing behaviour 
related to LocalExecutor.

 

Here is example log from our test system.
{code:java}
14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:122} DEBUG 
- 8 running task instances
14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:124} DEBUG 
- 0 open slots
14:22:45 scheduler.1 | [2020-02-20 14:22:45,016] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:47 scheduler.1 | [2020-02-20 14:22:47,200] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_17_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,200] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_16_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,201] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_15_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,202] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_14_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,202] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_13_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:122} DEBUG 
- 8 running task instances
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:123} DEBUG 
- 5 in queue
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:124} DEBUG 
- 0 open slots
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:47 scheduler.1 | [2020-02-20 14:22:47,204] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_4_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:47 scheduler.1 | [2020-02-20 14:22:47,205] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_19_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:122} DEBUG 
- 6 running task instances
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:124} DEBUG 
- 2 open slots
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:49 scheduler.1 | [2020-02-20 14:22:49,129] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_1_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:49 scheduler.1 | [2020-02-20 14:22:49,130] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_3_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:49 scheduler.1 | [2020-02-20 14:22:49,131] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_6_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:122} DEBUG 
- 3 running task instances
14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:124} DEBUG 
- 5 open slots
14:22:51 scheduler.1 | [2020-02-20 14:22:51,137] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:53 scheduler.1 | [2020-02-20 14:22:53,104] \{base_executor.py:122} DEBUG 
- 3 running task instances
14:22:53 scheduler.1 | [2020-02-20 14:22:53,104] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:124} DEBUG 
- 5 open slots
14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_5_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:122} DEBUG 
- 2 running task instances
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:124} DEBUG 
- 6 open slots
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:55 scheduler.1 | [2020-02-20 14:22:55,112] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_10_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:57 scheduler.1 | [2020-02-20 14:22:57,263] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'load_dag_test_6_entity_5_2_45', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,264] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_17_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,265] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_16_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,265] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_15_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,266] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_14_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,266] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_13_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,267] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_12_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,267] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_11_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:122} DEBUG 
- 1 running task instances
14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:123} DEBUG 
- 8 in queue
14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:124} DEBUG 
- 7 open slots
14:22:57 scheduler.1 | [[2020-02-20 14:22:57,2812020-02-20 14:22:57,281] \{] 
{local_executor.py:base_executor.py:85} INFO133} DEBUG - - Calling the 
QueuedLocalWorker<class 'airflow.executors.local_executor.LocalExecutor'> 
running sync method
14:22:57 scheduler.1 | [2020-02-20 14:22:57,282] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_18_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:122} DEBUG 
- 7 running task instances
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:124} DEBUG 
- 1 open slots
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method

{code}
The issue is that implementation clears queue and ordering is even so that 
LocalExecutor may been kept idle for no reason.

 

E.g. 
 # 8 running task instances 0 in queue 0 open slots
 # 5 new tasks pushed to queue
 # 2 tasks get ready
 # 6 running task instances 0 in queue 2 slots open
 # 3 tasks get ready
 # 3 running task instances 0 in queue 5 open slots
 # 1 tasks get ready
 # 2 running task instances 0 in queue 6 open slots
 # 8 new tasks pushed to queue
 # 1 running task instances 8 in queue 7 open slots
 # 1 task get ready
 # 7 running task instances 0 in queue 1 slots open

So depending how often dag get rescheduled it might take long before we get 
more tasks for LocalExecutor. Also it feels that now some other dags may get 
scheduled just at the time when some tasks get ready and then kind of take 
place of tasks queued before because those are always removed at then end of 
every scheduler loop.

So would it be beneficial to keep queued tasks in local executor queue so that 
LocalExecutor can start processing next task as soon as slot is available 
instead of waiting until dag get scheduled next time. 

Also as seen in 1,2,3,4 and 9,10,11,12 it seems that ordering of operations in 
LocalExecutor heartbeating cause that actually after scheduler loop slots got 
available are not filled all. Only those slots get used which are available 
before current heartbeating or at least it looks like it.

 

  was:
This is about question about possible improvement of queuing behaviour related 
to LocalExecutor.

 

Here is example log from our test system.
{code:java}
14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:122} DEBUG 
- 8 running task instances
14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:124} DEBUG 
- 0 open slots
14:22:45 scheduler.1 | [2020-02-20 14:22:45,016] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:47 scheduler.1 | [2020-02-20 14:22:47,200] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_17_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,200] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_16_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,201] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_15_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,202] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_14_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,202] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_13_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:122} DEBUG 
- 8 running task instances
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:123} DEBUG 
- 5 in queue
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:124} DEBUG 
- 0 open slots
14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:47 scheduler.1 | [2020-02-20 14:22:47,204] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_4_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:47 scheduler.1 | [2020-02-20 14:22:47,205] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_19_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:122} DEBUG 
- 6 running task instances
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:124} DEBUG 
- 2 open slots
14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:49 scheduler.1 | [2020-02-20 14:22:49,129] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_1_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:49 scheduler.1 | [2020-02-20 14:22:49,130] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_3_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:49 scheduler.1 | [2020-02-20 14:22:49,131] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_6_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:122} DEBUG 
- 3 running task instances
14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:124} DEBUG 
- 5 open slots
14:22:51 scheduler.1 | [2020-02-20 14:22:51,137] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:53 scheduler.1 | [2020-02-20 14:22:53,104] \{base_executor.py:122} DEBUG 
- 3 running task instances
14:22:53 scheduler.1 | [2020-02-20 14:22:53,104] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:124} DEBUG 
- 5 open slots
14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_5_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:122} DEBUG 
- 2 running task instances
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:124} DEBUG 
- 6 open slots
14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
14:22:55 scheduler.1 | [2020-02-20 14:22:55,112] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_10_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:57 scheduler.1 | [2020-02-20 14:22:57,263] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'load_dag_test_6_entity_5_2_45', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,264] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_17_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,265] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_16_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,265] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_15_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,266] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_14_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,266] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_13_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,267] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_12_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,267] \{base_executor.py:58} INFO - 
Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
'dev.DAG_TEST_6_ENTITY_11_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
'--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:122} DEBUG 
- 1 running task instances
14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:123} DEBUG 
- 8 in queue
14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:124} DEBUG 
- 7 open slots
14:22:57 scheduler.1 | [[2020-02-20 14:22:57,2812020-02-20 14:22:57,281] \{] 
{local_executor.py:base_executor.py:85} INFO133} DEBUG - - Calling the 
QueuedLocalWorker<class 'airflow.executors.local_executor.LocalExecutor'> 
running sync method
14:22:57 scheduler.1 | [2020-02-20 14:22:57,282] \{base_executor.py:157} DEBUG 
- Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_18_1', 
datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>), 1)
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:122} DEBUG 
- 7 running task instances
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:123} DEBUG 
- 0 in queue
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:124} DEBUG 
- 1 open slots
14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:133} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method

{code}
The issue is that implementation clears queue and ordering is even so that 
LocalExecutor may been kept idle for no reason.

 

E.g. 
 # 8 running task instances 0 in queue 0 open slots
 # 5 new tasks pushed to queue
 # 2 tasks get ready
 # 6 running task instances 0 in queue 2 slots open
 # 3 tasks get ready
 # 3 running task instances 0 in queue 5 open slots
 # 1 tasks get ready
 # 2 running task instances 0 in queue 6 open slots
 # 8 new tasks pushed to queue
 # 1 running task instances 8 in queue 7 open slots
 # 1 task get ready
 # 7 running task instances 0 in queue 1 slots open

So depending how often dag get rescheduled it might take long before we get 
more tasks for LocalExecutor. Also it feels that now some other dags may get 
scheduled just at the time when some tasks get ready and then kind of take 
place of tasks queued before because those are always removed at then end of 
every scheduler loop.

So would it be beneficial to keep queued tasks in local executor queue so that 
LocalExecutor can start processing next task as soon as slot is available 
instead of waiting until dag get scheduled next time. 

Also as seen in 1,2,3,4 and 9,10,11,12 it seems that ordering of operations in 
LocalExecutor heartbeating cause that actually after scheduler loop slots got 
available are not filled all. Only those slots get used which are available 
before current heartbeating or at least it looks like it.

 


> LocalExecutor queuing behaviour
> -------------------------------
>
>                 Key: AIRFLOW-7071
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-7071
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: scheduler
>    Affects Versions: 1.10.8
>            Reporter: Marko Kattelus
>            Priority: Minor
>
> This is about question about possibility to improvement of queuing behaviour 
> related to LocalExecutor.
>  
> Here is example log from our test system.
> {code:java}
> 14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:122} 
> DEBUG - 8 running task instances
> 14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:123} 
> DEBUG - 0 in queue
> 14:22:45 scheduler.1 | [2020-02-20 14:22:45,015] \{base_executor.py:124} 
> DEBUG - 0 open slots
> 14:22:45 scheduler.1 | [2020-02-20 14:22:45,016] \{base_executor.py:133} 
> DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> 
> sync method
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,200] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_17_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,200] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_16_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,201] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_15_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,202] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_14_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,202] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_13_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:122} 
> DEBUG - 8 running task instances
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:123} 
> DEBUG - 5 in queue
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:124} 
> DEBUG - 0 open slots
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,203] \{base_executor.py:133} 
> DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> 
> sync method
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,204] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_4_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:47 scheduler.1 | [2020-02-20 14:22:47,205] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_19_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:122} 
> DEBUG - 6 running task instances
> 14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:123} 
> DEBUG - 0 in queue
> 14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:124} 
> DEBUG - 2 open slots
> 14:22:49 scheduler.1 | [2020-02-20 14:22:49,128] \{base_executor.py:133} 
> DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> 
> sync method
> 14:22:49 scheduler.1 | [2020-02-20 14:22:49,129] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_1_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:49 scheduler.1 | [2020-02-20 14:22:49,130] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_3_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:49 scheduler.1 | [2020-02-20 14:22:49,131] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_6_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:122} 
> DEBUG - 3 running task instances
> 14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:123} 
> DEBUG - 0 in queue
> 14:22:51 scheduler.1 | [2020-02-20 14:22:51,136] \{base_executor.py:124} 
> DEBUG - 5 open slots
> 14:22:51 scheduler.1 | [2020-02-20 14:22:51,137] \{base_executor.py:133} 
> DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> 
> sync method
> 14:22:53 scheduler.1 | [2020-02-20 14:22:53,104] \{base_executor.py:122} 
> DEBUG - 3 running task instances
> 14:22:53 scheduler.1 | [2020-02-20 14:22:53,104] \{base_executor.py:123} 
> DEBUG - 0 in queue
> 14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:124} 
> DEBUG - 5 open slots
> 14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:133} 
> DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> 
> sync method
> 14:22:53 scheduler.1 | [2020-02-20 14:22:53,105] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_5_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:122} 
> DEBUG - 2 running task instances
> 14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:123} 
> DEBUG - 0 in queue
> 14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:124} 
> DEBUG - 6 open slots
> 14:22:55 scheduler.1 | [2020-02-20 14:22:55,111] \{base_executor.py:133} 
> DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> 
> sync method
> 14:22:55 scheduler.1 | [2020-02-20 14:22:55,112] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_10_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,263] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'load_dag_test_6_entity_5_2_45', '2020-02-20T14:18:01.743479+00:00', 
> '--local', '--pool', 'default_pool', '-sd', 
> '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,264] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_17_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,265] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_16_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,265] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_15_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,266] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_14_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,266] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_13_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,267] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_12_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,267] \{base_executor.py:58} INFO 
> - Adding to queue: ['airflow', 'run', 'DAG_TEST_6_5_20', 
> 'dev.DAG_TEST_6_ENTITY_11_1', '2020-02-20T14:18:01.743479+00:00', '--local', 
> '--pool', 'default_pool', '-sd', '/airflow/dags/dag_DAG_TEST_6_5_20.py']
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:122} 
> DEBUG - 1 running task instances
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:123} 
> DEBUG - 8 in queue
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,268] \{base_executor.py:124} 
> DEBUG - 7 open slots
> 14:22:57 scheduler.1 | [[2020-02-20 14:22:57,2812020-02-20 14:22:57,281] \{] 
> {local_executor.py:base_executor.py:85} INFO133} DEBUG - - Calling the 
> QueuedLocalWorker<class 'airflow.executors.local_executor.LocalExecutor'> 
> running sync method
> 14:22:57 scheduler.1 | [2020-02-20 14:22:57,282] \{base_executor.py:157} 
> DEBUG - Changing state: ('DAG_TEST_6_5_20', 'dev.DAG_TEST_6_ENTITY_18_1', 
> datetime.datetime(2020, 2, 20, 14, 18, 1, 743479, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>), 1)
> 14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:122} 
> DEBUG - 7 running task instances
> 14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:123} 
> DEBUG - 0 in queue
> 14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:124} 
> DEBUG - 1 open slots
> 14:22:59 scheduler.1 | [2020-02-20 14:22:59,124] \{base_executor.py:133} 
> DEBUG - Calling the <class 'airflow.executors.local_executor.LocalExecutor'> 
> sync method
> {code}
> The issue is that implementation clears queue and ordering is even so that 
> LocalExecutor may been kept idle for no reason.
>  
> E.g. 
>  # 8 running task instances 0 in queue 0 open slots
>  # 5 new tasks pushed to queue
>  # 2 tasks get ready
>  # 6 running task instances 0 in queue 2 slots open
>  # 3 tasks get ready
>  # 3 running task instances 0 in queue 5 open slots
>  # 1 tasks get ready
>  # 2 running task instances 0 in queue 6 open slots
>  # 8 new tasks pushed to queue
>  # 1 running task instances 8 in queue 7 open slots
>  # 1 task get ready
>  # 7 running task instances 0 in queue 1 slots open
> So depending how often dag get rescheduled it might take long before we get 
> more tasks for LocalExecutor. Also it feels that now some other dags may get 
> scheduled just at the time when some tasks get ready and then kind of take 
> place of tasks queued before because those are always removed at then end of 
> every scheduler loop.
> So would it be beneficial to keep queued tasks in local executor queue so 
> that LocalExecutor can start processing next task as soon as slot is 
> available instead of waiting until dag get scheduled next time. 
> Also as seen in 1,2,3,4 and 9,10,11,12 it seems that ordering of operations 
> in LocalExecutor heartbeating cause that actually after scheduler loop slots 
> got available are not filled all. Only those slots get used which are 
> available before current heartbeating or at least it looks like it.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to