[
https://issues.apache.org/jira/browse/AIRFLOW-7063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jarek Potiuk resolved AIRFLOW-7063.
-----------------------------------
Fix Version/s: 1.10.10
Resolution: Fixed
> dag.clear() slowness caused by multiple UNION statements and tis.count()
> ------------------------------------------------------------------------
>
> Key: AIRFLOW-7063
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7063
> Project: Apache Airflow
> Issue Type: Improvement
> Components: webserver
> Affects Versions: 1.10.9
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
> Fix For: 1.10.10
>
>
> When multiple {{ExternalTaskMarker}} are used, {{dag.clear()}} becomes very
> slow when clearing all the {{ExternalTaskMarker}} together. The slowness
> turns out to come from this line of code in {{dag.clear()}}:
> {code:python}
> if dry_run:
> tis = tis.all()
> session.expunge_all()
> return tis
> count = tis.count() <------- This line is the culprit
> do_it = True
> if count == 0:
> return 0
> {code}
> This is the sql generated by {{tis.count()}} when there are three
> {{ExternalTaskMarker}} being cleared together. Note there's nothing wrong
> with the sql and it's reasonably efficient when executed on postgres even
> when the number of UNION statements is bigger (e.g. 30 UNION statements takes
> about 13ms in the docker container I started with breeze)
> But it takes more than three minutes for sqlalchemy to construct this count
> query before it goes to the database.
> The fix is really simple, just get rid of the count() and query all the
> entries from the db instead. The function becomes ten times faster when
> {{tis.count()}} is removed.
> There are multiple places people are complaining about similar problems with
> sqlalchemy count() being slower than the query itself. It does not look like
> sqlalchemy is going to fix this issue:
>
> [https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query]
> [https://gist.github.com/hest/8798884]
>
> {code:sql}
> [2020-03-14 09:42:50,264] {base.py:1203} INFO - SELECT count(*) AS count_1
> FROM (SELECT anon_2.anon_3_anon_4_task_instance_try_number AS
> anon_2_anon_3_anon_4_task_instance_try_number,
> anon_2.anon_3_anon_4_task_instance_task_id AS
> anon_2_anon_3_anon_4_task_instance_task_id,
> anon_2.anon_3_anon_4_task_instance_dag_id AS
> anon_2_anon_3_anon_4_task_instance_dag_id,
> anon_2.anon_3_anon_4_task_instance_execution_date AS
> anon_2_anon_3_anon_4_task_instance_execution_date,
> anon_2.anon_3_anon_4_task_instance_start_date AS
> anon_2_anon_3_anon_4_task_instance_start_date,
> anon_2.anon_3_anon_4_task_instance_end_date AS
> anon_2_anon_3_anon_4_task_instance_end_date,
> anon_2.anon_3_anon_4_task_instance_duration AS
> anon_2_anon_3_anon_4_task_instance_duration,
> anon_2.anon_3_anon_4_task_instance_state AS
> anon_2_anon_3_anon_4_task_instance_state,
> anon_2.anon_3_anon_4_task_instance_max_tries AS
> anon_2_anon_3_anon_4_task_instance_max_tries,
> anon_2.anon_3_anon_4_task_instance_hostname AS
> anon_2_anon_3_anon_4_task_instance_hostname,
> anon_2.anon_3_anon_4_task_instance_unixname AS
> anon_2_anon_3_anon_4_task_instance_unixname,
> anon_2.anon_3_anon_4_task_instance_job_id AS
> anon_2_anon_3_anon_4_task_instance_job_id,
> anon_2.anon_3_anon_4_task_instance_pool AS
> anon_2_anon_3_anon_4_task_instance_pool,
> anon_2.anon_3_anon_4_task_instance_pool_slots AS
> anon_2_anon_3_anon_4_task_instance_pool_slots,
> anon_2.anon_3_anon_4_task_instance_queue AS
> anon_2_anon_3_anon_4_task_instance_queue,
> anon_2.anon_3_anon_4_task_instance_priority_weight AS
> anon_2_anon_3_anon_4_task_instance_priority_weight,
> anon_2.anon_3_anon_4_task_instance_operator AS
> anon_2_anon_3_anon_4_task_instance_operator,
> anon_2.anon_3_anon_4_task_instance_queued_dttm AS
> anon_2_anon_3_anon_4_task_instance_queued_dttm,
> anon_2.anon_3_anon_4_task_instance_pid AS
> anon_2_anon_3_anon_4_task_instance_pid,
> anon_2.anon_3_anon_4_task_instance_executor_config AS
> anon_2_anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_3.anon_4_task_instance_try_number AS
> anon_3_anon_4_task_instance_try_number, anon_3.anon_4_task_instance_task_id
> AS anon_3_anon_4_task_instance_task_id, anon_3.anon_4_task_instance_dag_id AS
> anon_3_anon_4_task_instance_dag_id,
> anon_3.anon_4_task_instance_execution_date AS
> anon_3_anon_4_task_instance_execution_date,
> anon_3.anon_4_task_instance_start_date AS
> anon_3_anon_4_task_instance_start_date, anon_3.anon_4_task_instance_end_date
> AS anon_3_anon_4_task_instance_end_date, anon_3.anon_4_task_instance_duration
> AS anon_3_anon_4_task_instance_duration, anon_3.anon_4_task_instance_state AS
> anon_3_anon_4_task_instance_state, anon_3.anon_4_task_instance_max_tries AS
> anon_3_anon_4_task_instance_max_tries, anon_3.anon_4_task_instance_hostname
> AS anon_3_anon_4_task_instance_hostname, anon_3.anon_4_task_instance_unixname
> AS anon_3_anon_4_task_instance_unixname, anon_3.anon_4_task_instance_job_id
> AS anon_3_anon_4_task_instance_job_id, anon_3.anon_4_task_instance_pool AS
> anon_3_anon_4_task_instance_pool, anon_3.anon_4_task_instance_pool_slots AS
> anon_3_anon_4_task_instance_pool_slots, anon_3.anon_4_task_instance_queue AS
> anon_3_anon_4_task_instance_queue,
> anon_3.anon_4_task_instance_priority_weight AS
> anon_3_anon_4_task_instance_priority_weight,
> anon_3.anon_4_task_instance_operator AS anon_3_anon_4_task_instance_operator,
> anon_3.anon_4_task_instance_queued_dttm AS
> anon_3_anon_4_task_instance_queued_dttm, anon_3.anon_4_task_instance_pid AS
> anon_3_anon_4_task_instance_pid, anon_3.anon_4_task_instance_executor_config
> AS anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_4.task_instance_try_number AS
> anon_4_task_instance_try_number, anon_4.task_instance_task_id AS
> anon_4_task_instance_task_id, anon_4.task_instance_dag_id AS
> anon_4_task_instance_dag_id, anon_4.task_instance_execution_date AS
> anon_4_task_instance_execution_date, anon_4.task_instance_start_date AS
> anon_4_task_instance_start_date, anon_4.task_instance_end_date AS
> anon_4_task_instance_end_date, anon_4.task_instance_duration AS
> anon_4_task_instance_duration, anon_4.task_instance_state AS
> anon_4_task_instance_state, anon_4.task_instance_max_tries AS
> anon_4_task_instance_max_tries, anon_4.task_instance_hostname AS
> anon_4_task_instance_hostname, anon_4.task_instance_unixname AS
> anon_4_task_instance_unixname, anon_4.task_instance_job_id AS
> anon_4_task_instance_job_id, anon_4.task_instance_pool AS
> anon_4_task_instance_pool, anon_4.task_instance_pool_slots AS
> anon_4_task_instance_pool_slots, anon_4.task_instance_queue AS
> anon_4_task_instance_queue, anon_4.task_instance_priority_weight AS
> anon_4_task_instance_priority_weight, anon_4.task_instance_operator AS
> anon_4_task_instance_operator, anon_4.task_instance_queued_dttm AS
> anon_4_task_instance_queued_dttm, anon_4.task_instance_pid AS
> anon_4_task_instance_pid, anon_4.task_instance_executor_config AS
> anon_4_task_instance_executor_config
> FROM (SELECT task_instance.try_number AS task_instance_try_number,
> task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS
> task_instance_dag_id, task_instance.execution_date AS
> task_instance_execution_date, task_instance.start_date AS
> task_instance_start_date, task_instance.end_date AS task_instance_end_date,
> task_instance.duration AS task_instance_duration, task_instance.state AS
> task_instance_state, task_instance.max_tries AS task_instance_max_tries,
> task_instance.hostname AS task_instance_hostname, task_instance.unixname AS
> task_instance_unixname, task_instance.job_id AS task_instance_job_id,
> task_instance.pool AS task_instance_pool, task_instance.pool_slots AS
> task_instance_pool_slots, task_instance.queue AS task_instance_queue,
> task_instance.priority_weight AS task_instance_priority_weight,
> task_instance.operator AS task_instance_operator, task_instance.queued_dttm
> AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_1)s AND task_instance.task_id IN
> (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s) AND
> task_instance.execution_date >= %(execution_date_1)s AND
> task_instance.execution_date <= %(execution_date_2)s UNION SELECT
> task_instance.try_number AS task_instance_try_number, task_instance.task_id
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
> task_instance.execution_date AS task_instance_execution_date,
> task_instance.start_date AS task_instance_start_date, task_instance.end_date
> AS task_instance_end_date, task_instance.duration AS task_instance_duration,
> task_instance.state AS task_instance_state, task_instance.max_tries AS
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
> task_instance_job_id, task_instance.pool AS task_instance_pool,
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
> task_instance_queue, task_instance.priority_weight AS
> task_instance_priority_weight, task_instance.operator AS
> task_instance_operator, task_instance.queued_dttm AS
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_2)s AND task_instance.task_id IN
> (%(task_id_5)s) AND task_instance.execution_date >= %(execution_date_3)s AND
> task_instance.execution_date <= %(execution_date_4)s) AS anon_4 UNION SELECT
> task_instance.try_number AS task_instance_try_number, task_instance.task_id
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
> task_instance.execution_date AS task_instance_execution_date,
> task_instance.start_date AS task_instance_start_date, task_instance.end_date
> AS task_instance_end_date, task_instance.duration AS task_instance_duration,
> task_instance.state AS task_instance_state, task_instance.max_tries AS
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
> task_instance_job_id, task_instance.pool AS task_instance_pool,
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
> task_instance_queue, task_instance.priority_weight AS
> task_instance_priority_weight, task_instance.operator AS
> task_instance_operator, task_instance.queued_dttm AS
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_3)s AND task_instance.task_id IN
> (%(task_id_6)s) AND task_instance.execution_date >= %(execution_date_5)s AND
> task_instance.execution_date <= %(execution_date_6)s) AS anon_3 UNION SELECT
> task_instance.try_number AS task_instance_try_number, task_instance.task_id
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
> task_instance.execution_date AS task_instance_execution_date,
> task_instance.start_date AS task_instance_start_date, task_instance.end_date
> AS task_instance_end_date, task_instance.duration AS task_instance_duration,
> task_instance.state AS task_instance_state, task_instance.max_tries AS
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
> task_instance_job_id, task_instance.pool AS task_instance_pool,
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
> task_instance_queue, task_instance.priority_weight AS
> task_instance_priority_weight, task_instance.operator AS
> task_instance_operator, task_instance.queued_dttm AS
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_4)s AND task_instance.task_id IN
> (%(task_id_7)s) AND task_instance.execution_date >= %(execution_date_7)s AND
> task_instance.execution_date <= %(execution_date_8)s) AS anon_2) AS anon_1
> [2020-03-14 09:42:50,265] {base.py:1208} INFO - "\x1b[1m{'dag_id_1':
> 'agg_dag', 'task_id_1': 'start', 'task_id_2': 'daily_tas_0', 'task_id_3':
> 'daily_tas_1', 'task_id_4': 'daily_tas_2', 'execution_date_1':
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")),
> 'execution_date_2': datetime.datetime(2015, 1, 4, 0, 0,
> tzinfo=pendulum.timezone("UTC")), 'dag_id_2': 'daily_dag', 'task_id_5':
> 'daily_tas', 'execution_date_3': datetime.datetime(2015, 1, 3, 0, 0,
> tzinfo=pendulum.timezone("UTC")), 'execution_date_4': datetime.datetime(2015,
> 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_3': 'daily_dag',
> 'task_id_6': 'daily_tas', 'execution_date_5': datetime.datetime(2015, 1, 2,
> 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_6':
> datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")),
> 'dag_id_4': 'daily_dag', 'task_id_7': 'daily_tas', 'execution_date_7':
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")),
> 'execution_date_8': datetime.datetime(2015, 1, 4, 0, 0,
> tzinfo=pendulum.timezone("UTC"))}\x1b[0m"
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)