[ 
https://issues.apache.org/jira/browse/AIRFLOW-7063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarek Potiuk resolved AIRFLOW-7063.
-----------------------------------
    Fix Version/s: 1.10.10
       Resolution: Fixed

> dag.clear() slowness caused by multiple UNION statements and tis.count()
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-7063
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-7063
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: webserver
>    Affects Versions: 1.10.9
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>             Fix For: 1.10.10
>
>
> When multiple {{ExternalTaskMarker}} are used, {{dag.clear()}} becomes very 
> slow when clearing all the {{ExternalTaskMarker}} together. The slowness 
> turns out to come from this line of code in {{dag.clear()}}:
> {code:python}
>         if dry_run:
>             tis = tis.all()
>             session.expunge_all()
>             return tis
>         count = tis.count()   <------- This line is the culprit
>         do_it = True
>         if count == 0:
>             return 0
> {code}
> This is the sql generated by {{tis.count()}} when there are three 
> {{ExternalTaskMarker}} being cleared together. Note there's nothing wrong 
> with the sql and it's reasonably efficient when executed on postgres even 
> when the number of UNION statements is bigger (e.g. 30 UNION statements takes 
> about 13ms in the docker container I started with breeze)
>  But it takes more than three minutes for sqlalchemy to construct this count 
> query before it goes to the database.
> The fix is really simple, just get rid of the count() and query all the 
> entries from the db instead. The function becomes ten times faster when 
> {{tis.count()}} is removed.
>  There are multiple places people are complaining about similar problems with 
> sqlalchemy count() being slower than the query itself. It does not look like 
> sqlalchemy is going to fix this issue:
>  
> [https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query]
>  [https://gist.github.com/hest/8798884]
>  
> {code:sql}
> [2020-03-14 09:42:50,264] {base.py:1203} INFO - SELECT count(*) AS count_1
> FROM (SELECT anon_2.anon_3_anon_4_task_instance_try_number AS 
> anon_2_anon_3_anon_4_task_instance_try_number, 
> anon_2.anon_3_anon_4_task_instance_task_id AS 
> anon_2_anon_3_anon_4_task_instance_task_id, 
> anon_2.anon_3_anon_4_task_instance_dag_id AS 
> anon_2_anon_3_anon_4_task_instance_dag_id, 
> anon_2.anon_3_anon_4_task_instance_execution_date AS 
> anon_2_anon_3_anon_4_task_instance_execution_date, 
> anon_2.anon_3_anon_4_task_instance_start_date AS 
> anon_2_anon_3_anon_4_task_instance_start_date, 
> anon_2.anon_3_anon_4_task_instance_end_date AS 
> anon_2_anon_3_anon_4_task_instance_end_date, 
> anon_2.anon_3_anon_4_task_instance_duration AS 
> anon_2_anon_3_anon_4_task_instance_duration, 
> anon_2.anon_3_anon_4_task_instance_state AS 
> anon_2_anon_3_anon_4_task_instance_state, 
> anon_2.anon_3_anon_4_task_instance_max_tries AS 
> anon_2_anon_3_anon_4_task_instance_max_tries, 
> anon_2.anon_3_anon_4_task_instance_hostname AS 
> anon_2_anon_3_anon_4_task_instance_hostname, 
> anon_2.anon_3_anon_4_task_instance_unixname AS 
> anon_2_anon_3_anon_4_task_instance_unixname, 
> anon_2.anon_3_anon_4_task_instance_job_id AS 
> anon_2_anon_3_anon_4_task_instance_job_id, 
> anon_2.anon_3_anon_4_task_instance_pool AS 
> anon_2_anon_3_anon_4_task_instance_pool, 
> anon_2.anon_3_anon_4_task_instance_pool_slots AS 
> anon_2_anon_3_anon_4_task_instance_pool_slots, 
> anon_2.anon_3_anon_4_task_instance_queue AS 
> anon_2_anon_3_anon_4_task_instance_queue, 
> anon_2.anon_3_anon_4_task_instance_priority_weight AS 
> anon_2_anon_3_anon_4_task_instance_priority_weight, 
> anon_2.anon_3_anon_4_task_instance_operator AS 
> anon_2_anon_3_anon_4_task_instance_operator, 
> anon_2.anon_3_anon_4_task_instance_queued_dttm AS 
> anon_2_anon_3_anon_4_task_instance_queued_dttm, 
> anon_2.anon_3_anon_4_task_instance_pid AS 
> anon_2_anon_3_anon_4_task_instance_pid, 
> anon_2.anon_3_anon_4_task_instance_executor_config AS 
> anon_2_anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_3.anon_4_task_instance_try_number AS 
> anon_3_anon_4_task_instance_try_number, anon_3.anon_4_task_instance_task_id 
> AS anon_3_anon_4_task_instance_task_id, anon_3.anon_4_task_instance_dag_id AS 
> anon_3_anon_4_task_instance_dag_id, 
> anon_3.anon_4_task_instance_execution_date AS 
> anon_3_anon_4_task_instance_execution_date, 
> anon_3.anon_4_task_instance_start_date AS 
> anon_3_anon_4_task_instance_start_date, anon_3.anon_4_task_instance_end_date 
> AS anon_3_anon_4_task_instance_end_date, anon_3.anon_4_task_instance_duration 
> AS anon_3_anon_4_task_instance_duration, anon_3.anon_4_task_instance_state AS 
> anon_3_anon_4_task_instance_state, anon_3.anon_4_task_instance_max_tries AS 
> anon_3_anon_4_task_instance_max_tries, anon_3.anon_4_task_instance_hostname 
> AS anon_3_anon_4_task_instance_hostname, anon_3.anon_4_task_instance_unixname 
> AS anon_3_anon_4_task_instance_unixname, anon_3.anon_4_task_instance_job_id 
> AS anon_3_anon_4_task_instance_job_id, anon_3.anon_4_task_instance_pool AS 
> anon_3_anon_4_task_instance_pool, anon_3.anon_4_task_instance_pool_slots AS 
> anon_3_anon_4_task_instance_pool_slots, anon_3.anon_4_task_instance_queue AS 
> anon_3_anon_4_task_instance_queue, 
> anon_3.anon_4_task_instance_priority_weight AS 
> anon_3_anon_4_task_instance_priority_weight, 
> anon_3.anon_4_task_instance_operator AS anon_3_anon_4_task_instance_operator, 
> anon_3.anon_4_task_instance_queued_dttm AS 
> anon_3_anon_4_task_instance_queued_dttm, anon_3.anon_4_task_instance_pid AS 
> anon_3_anon_4_task_instance_pid, anon_3.anon_4_task_instance_executor_config 
> AS anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_4.task_instance_try_number AS 
> anon_4_task_instance_try_number, anon_4.task_instance_task_id AS 
> anon_4_task_instance_task_id, anon_4.task_instance_dag_id AS 
> anon_4_task_instance_dag_id, anon_4.task_instance_execution_date AS 
> anon_4_task_instance_execution_date, anon_4.task_instance_start_date AS 
> anon_4_task_instance_start_date, anon_4.task_instance_end_date AS 
> anon_4_task_instance_end_date, anon_4.task_instance_duration AS 
> anon_4_task_instance_duration, anon_4.task_instance_state AS 
> anon_4_task_instance_state, anon_4.task_instance_max_tries AS 
> anon_4_task_instance_max_tries, anon_4.task_instance_hostname AS 
> anon_4_task_instance_hostname, anon_4.task_instance_unixname AS 
> anon_4_task_instance_unixname, anon_4.task_instance_job_id AS 
> anon_4_task_instance_job_id, anon_4.task_instance_pool AS 
> anon_4_task_instance_pool, anon_4.task_instance_pool_slots AS 
> anon_4_task_instance_pool_slots, anon_4.task_instance_queue AS 
> anon_4_task_instance_queue, anon_4.task_instance_priority_weight AS 
> anon_4_task_instance_priority_weight, anon_4.task_instance_operator AS 
> anon_4_task_instance_operator, anon_4.task_instance_queued_dttm AS 
> anon_4_task_instance_queued_dttm, anon_4.task_instance_pid AS 
> anon_4_task_instance_pid, anon_4.task_instance_executor_config AS 
> anon_4_task_instance_executor_config
> FROM (SELECT task_instance.try_number AS task_instance_try_number, 
> task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
> task_instance_dag_id, task_instance.execution_date AS 
> task_instance_execution_date, task_instance.start_date AS 
> task_instance_start_date, task_instance.end_date AS task_instance_end_date, 
> task_instance.duration AS task_instance_duration, task_instance.state AS 
> task_instance_state, task_instance.max_tries AS task_instance_max_tries, 
> task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
> task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
> task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
> task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
> task_instance.priority_weight AS task_instance_priority_weight, 
> task_instance.operator AS task_instance_operator, task_instance.queued_dttm 
> AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_1)s AND task_instance.task_id IN 
> (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s) AND 
> task_instance.execution_date >= %(execution_date_1)s AND 
> task_instance.execution_date <= %(execution_date_2)s UNION SELECT 
> task_instance.try_number AS task_instance_try_number, task_instance.task_id 
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
> task_instance.execution_date AS task_instance_execution_date, 
> task_instance.start_date AS task_instance_start_date, task_instance.end_date 
> AS task_instance_end_date, task_instance.duration AS task_instance_duration, 
> task_instance.state AS task_instance_state, task_instance.max_tries AS 
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
> task_instance_job_id, task_instance.pool AS task_instance_pool, 
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
> task_instance_queue, task_instance.priority_weight AS 
> task_instance_priority_weight, task_instance.operator AS 
> task_instance_operator, task_instance.queued_dttm AS 
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_2)s AND task_instance.task_id IN 
> (%(task_id_5)s) AND task_instance.execution_date >= %(execution_date_3)s AND 
> task_instance.execution_date <= %(execution_date_4)s) AS anon_4 UNION SELECT 
> task_instance.try_number AS task_instance_try_number, task_instance.task_id 
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
> task_instance.execution_date AS task_instance_execution_date, 
> task_instance.start_date AS task_instance_start_date, task_instance.end_date 
> AS task_instance_end_date, task_instance.duration AS task_instance_duration, 
> task_instance.state AS task_instance_state, task_instance.max_tries AS 
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
> task_instance_job_id, task_instance.pool AS task_instance_pool, 
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
> task_instance_queue, task_instance.priority_weight AS 
> task_instance_priority_weight, task_instance.operator AS 
> task_instance_operator, task_instance.queued_dttm AS 
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_3)s AND task_instance.task_id IN 
> (%(task_id_6)s) AND task_instance.execution_date >= %(execution_date_5)s AND 
> task_instance.execution_date <= %(execution_date_6)s) AS anon_3 UNION SELECT 
> task_instance.try_number AS task_instance_try_number, task_instance.task_id 
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
> task_instance.execution_date AS task_instance_execution_date, 
> task_instance.start_date AS task_instance_start_date, task_instance.end_date 
> AS task_instance_end_date, task_instance.duration AS task_instance_duration, 
> task_instance.state AS task_instance_state, task_instance.max_tries AS 
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
> task_instance_job_id, task_instance.pool AS task_instance_pool, 
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
> task_instance_queue, task_instance.priority_weight AS 
> task_instance_priority_weight, task_instance.operator AS 
> task_instance_operator, task_instance.queued_dttm AS 
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_4)s AND task_instance.task_id IN 
> (%(task_id_7)s) AND task_instance.execution_date >= %(execution_date_7)s AND 
> task_instance.execution_date <= %(execution_date_8)s) AS anon_2) AS anon_1
> [2020-03-14 09:42:50,265] {base.py:1208} INFO - "\x1b[1m{'dag_id_1': 
> 'agg_dag', 'task_id_1': 'start', 'task_id_2': 'daily_tas_0', 'task_id_3': 
> 'daily_tas_1', 'task_id_4': 'daily_tas_2', 'execution_date_1': 
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 
> 'execution_date_2': datetime.datetime(2015, 1, 4, 0, 0, 
> tzinfo=pendulum.timezone("UTC")), 'dag_id_2': 'daily_dag', 'task_id_5': 
> 'daily_tas', 'execution_date_3': datetime.datetime(2015, 1, 3, 0, 0, 
> tzinfo=pendulum.timezone("UTC")), 'execution_date_4': datetime.datetime(2015, 
> 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_3': 'daily_dag', 
> 'task_id_6': 'daily_tas', 'execution_date_5': datetime.datetime(2015, 1, 2, 
> 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_6': 
> datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 
> 'dag_id_4': 'daily_dag', 'task_id_7': 'daily_tas', 'execution_date_7': 
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 
> 'execution_date_8': datetime.datetime(2015, 1, 4, 0, 0, 
> tzinfo=pendulum.timezone("UTC"))}\x1b[0m"
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to