[
https://issues.apache.org/jira/browse/AIRFLOW-7063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17059319#comment-17059319
]
Qian Yu commented on AIRFLOW-7063:
----------------------------------
Here's a self-contained test to reproduce this slowness.
{code:python}
def test_clear_multiple_external_task_marker():
dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)
daily_dag = DAG("daily_dag", start_date=DEFAULT_DATE,
schedule_interval="@daily")
agg_dag = DAG("agg_dag", start_date=DEFAULT_DATE,
schedule_interval="@daily")
dag_bag.bag_dag(daily_dag, None, daily_dag)
dag_bag.bag_dag(agg_dag, None, agg_dag)
daily_task = DummyOperator(task_id="daily_tas", dag=daily_dag)
start = DummyOperator(task_id="start", dag=agg_dag)
number_union = 20
for i in range(number_union):
task = ExternalTaskMarker(task_id=f"{daily_task.task_id}_{i}",
external_dag_id=daily_dag.dag_id,
external_task_id=daily_task.task_id,
execution_date="{{ macros.ds_add(ds, -1 * %s)
}}" % i,
dag=agg_dag)
start >> task
for delta in range(len(agg_dag.tasks)):
execution_date = DEFAULT_DATE + timedelta(days=delta)
for dag in dag_bag.dags.values():
for task in dag.tasks:
ti = TaskInstance(task=task, execution_date=execution_date)
ti.run()
agg_dag.clear(start_date=execution_date, end_date=execution_date,
dag_bag=dag_bag)
{code}
This is the time it takes to call {{dag.clear()}} before and after
{{tis.count()}} is removed:
With {{tis.count()}}, it gets much slower when more {{ExternalTaskMarker}} are
used:
{code}
number_union: 1, dag.clear() takes: 0.05681347846984863s
number_union: 2, dag.clear() takes: 0.06906294822692871s
number_union: 3, dag.clear() takes: 0.0995175838470459s
number_union: 4, dag.clear() takes: 0.1320326328277588s
number_union: 5, dag.clear() takes: 0.15970063209533691s
number_union: 6, dag.clear() takes: 0.19741106033325195s
number_union: 7, dag.clear() takes: 0.22883057594299316s
number_union: 8, dag.clear() takes: 0.2697880268096924s
number_union: 9, dag.clear() takes: 0.29255032539367676s
number_union: 10, dag.clear() takes: 0.33300089836120605s
number_union: 11, dag.clear() takes: 0.37502217292785645s
number_union: 12, dag.clear() takes: 0.4150240421295166s
number_union: 13, dag.clear() takes: 0.5335328578948975s
number_union: 14, dag.clear() takes: 0.5695874691009521s
number_union: 15, dag.clear() takes: 0.652968168258667s
number_union: 16, dag.clear() takes: 0.951585054397583s
number_union: 17, dag.clear() takes: 1.356891393661499s
number_union: 18, dag.clear() takes: 1.9704437255859375s
number_union: 19, dag.clear() takes: 3.2505640983581543s
number_union: 20, dag.clear() takes: 5.810389757156372s
number_union: 21, dag.clear() takes: 11.054765701293945s
number_union: 22, dag.clear() takes: 24.731156826019287s
number_union: 23, dag.clear() takes: 41.684504985809326s
number_union: 24, dag.clear() takes: 82.07931756973267s
number_union: 25, dag.clear() takes: 164.3687846660614s
{code}
After replacing {{tis.count()}} with {{len(tis.all()):}}. Performance of
{{dag.clear()}} is much more reasonable and about linear wrt the number of
{{ExternalTaskMarker}}:
{code}
number_union: 1, dag.clear() takes: 0.04764533042907715s
number_union: 2, dag.clear() takes: 0.06222081184387207s
number_union: 3, dag.clear() takes: 0.08550524711608887s
number_union: 4, dag.clear() takes: 0.1136934757232666s
number_union: 5, dag.clear() takes: 0.14034652709960938s
number_union: 6, dag.clear() takes: 0.18161964416503906s
number_union: 7, dag.clear() takes: 0.20060372352600098s
number_union: 8, dag.clear() takes: 0.24526739120483398s
number_union: 9, dag.clear() takes: 0.2750375270843506s
number_union: 10, dag.clear() takes: 0.29474949836730957s
number_union: 11, dag.clear() takes: 0.37499213218688965s
number_union: 12, dag.clear() takes: 0.35225915908813477s
number_union: 13, dag.clear() takes: 0.3956146240234375s
number_union: 14, dag.clear() takes: 0.45566248893737793s
number_union: 15, dag.clear() takes: 0.45056653022766113s
number_union: 16, dag.clear() takes: 0.5372872352600098s
number_union: 17, dag.clear() takes: 0.5812375545501709s
number_union: 18, dag.clear() takes: 0.5535945892333984s
number_union: 19, dag.clear() takes: 0.6553409099578857s
number_union: 20, dag.clear() takes: 0.7062091827392578s
number_union: 21, dag.clear() takes: 0.7340657711029053s
number_union: 22, dag.clear() takes: 0.7563188076019287s
number_union: 23, dag.clear() takes: 0.826127290725708s
number_union: 24, dag.clear() takes: 0.8763349056243896s
number_union: 25, dag.clear() takes: 0.903679370880127s
{code}
This example is using {{ExternalTaskMarker}}, I believe the same slowness will
happen if a lot of nested {{SubDagOperator}} are used too (although nesting
many levels of {{SubDagOperator}} makes less sense)
> dag.clear() slowness caused by multiple UNION statements and tis.count()
> ------------------------------------------------------------------------
>
> Key: AIRFLOW-7063
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7063
> Project: Apache Airflow
> Issue Type: Improvement
> Components: webserver
> Affects Versions: 1.10.9
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
>
> When multiple {{ExternalTaskMarker}} are used, {{dag.clear()}} becomes very
> slow when clearing all the {{ExternalTaskMarker}} together. The slowness
> turns out to come from this line of code in {{dag.clear()}}:
> {code:python}
> if dry_run:
> tis = tis.all()
> session.expunge_all()
> return tis
> count = tis.count() <------- This line is the culprit
> do_it = True
> if count == 0:
> return 0
> {code}
> This is the sql generated by {{tis.count()}} when there are three
> {{ExternalTaskMarker}} being cleared together. Note there's nothing wrong
> with the sql and it's reasonably efficient when executed on postgres even
> when the number of UNION statements is bigger (e.g. 30 UNION statements takes
> about 13ms in the docker container I started with breeze)
> But it takes more than three minutes for sqlalchemy to construct this count
> query before it goes to the database.
> The fix is really simple, just get rid of the count() and query all the
> entries from the db instead. The function becomes ten times faster when
> {{tis.count()}} is removed.
> There are multiple places people are complaining about similar problems with
> sqlalchemy count() being slower than the query itself. It does not look like
> sqlalchemy is going to fix this issue:
>
> [https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query]
> [https://gist.github.com/hest/8798884]
>
> {code:sql}
> [2020-03-14 09:42:50,264] {base.py:1203} INFO - SELECT count(*) AS count_1
> FROM (SELECT anon_2.anon_3_anon_4_task_instance_try_number AS
> anon_2_anon_3_anon_4_task_instance_try_number,
> anon_2.anon_3_anon_4_task_instance_task_id AS
> anon_2_anon_3_anon_4_task_instance_task_id,
> anon_2.anon_3_anon_4_task_instance_dag_id AS
> anon_2_anon_3_anon_4_task_instance_dag_id,
> anon_2.anon_3_anon_4_task_instance_execution_date AS
> anon_2_anon_3_anon_4_task_instance_execution_date,
> anon_2.anon_3_anon_4_task_instance_start_date AS
> anon_2_anon_3_anon_4_task_instance_start_date,
> anon_2.anon_3_anon_4_task_instance_end_date AS
> anon_2_anon_3_anon_4_task_instance_end_date,
> anon_2.anon_3_anon_4_task_instance_duration AS
> anon_2_anon_3_anon_4_task_instance_duration,
> anon_2.anon_3_anon_4_task_instance_state AS
> anon_2_anon_3_anon_4_task_instance_state,
> anon_2.anon_3_anon_4_task_instance_max_tries AS
> anon_2_anon_3_anon_4_task_instance_max_tries,
> anon_2.anon_3_anon_4_task_instance_hostname AS
> anon_2_anon_3_anon_4_task_instance_hostname,
> anon_2.anon_3_anon_4_task_instance_unixname AS
> anon_2_anon_3_anon_4_task_instance_unixname,
> anon_2.anon_3_anon_4_task_instance_job_id AS
> anon_2_anon_3_anon_4_task_instance_job_id,
> anon_2.anon_3_anon_4_task_instance_pool AS
> anon_2_anon_3_anon_4_task_instance_pool,
> anon_2.anon_3_anon_4_task_instance_pool_slots AS
> anon_2_anon_3_anon_4_task_instance_pool_slots,
> anon_2.anon_3_anon_4_task_instance_queue AS
> anon_2_anon_3_anon_4_task_instance_queue,
> anon_2.anon_3_anon_4_task_instance_priority_weight AS
> anon_2_anon_3_anon_4_task_instance_priority_weight,
> anon_2.anon_3_anon_4_task_instance_operator AS
> anon_2_anon_3_anon_4_task_instance_operator,
> anon_2.anon_3_anon_4_task_instance_queued_dttm AS
> anon_2_anon_3_anon_4_task_instance_queued_dttm,
> anon_2.anon_3_anon_4_task_instance_pid AS
> anon_2_anon_3_anon_4_task_instance_pid,
> anon_2.anon_3_anon_4_task_instance_executor_config AS
> anon_2_anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_3.anon_4_task_instance_try_number AS
> anon_3_anon_4_task_instance_try_number, anon_3.anon_4_task_instance_task_id
> AS anon_3_anon_4_task_instance_task_id, anon_3.anon_4_task_instance_dag_id AS
> anon_3_anon_4_task_instance_dag_id,
> anon_3.anon_4_task_instance_execution_date AS
> anon_3_anon_4_task_instance_execution_date,
> anon_3.anon_4_task_instance_start_date AS
> anon_3_anon_4_task_instance_start_date, anon_3.anon_4_task_instance_end_date
> AS anon_3_anon_4_task_instance_end_date, anon_3.anon_4_task_instance_duration
> AS anon_3_anon_4_task_instance_duration, anon_3.anon_4_task_instance_state AS
> anon_3_anon_4_task_instance_state, anon_3.anon_4_task_instance_max_tries AS
> anon_3_anon_4_task_instance_max_tries, anon_3.anon_4_task_instance_hostname
> AS anon_3_anon_4_task_instance_hostname, anon_3.anon_4_task_instance_unixname
> AS anon_3_anon_4_task_instance_unixname, anon_3.anon_4_task_instance_job_id
> AS anon_3_anon_4_task_instance_job_id, anon_3.anon_4_task_instance_pool AS
> anon_3_anon_4_task_instance_pool, anon_3.anon_4_task_instance_pool_slots AS
> anon_3_anon_4_task_instance_pool_slots, anon_3.anon_4_task_instance_queue AS
> anon_3_anon_4_task_instance_queue,
> anon_3.anon_4_task_instance_priority_weight AS
> anon_3_anon_4_task_instance_priority_weight,
> anon_3.anon_4_task_instance_operator AS anon_3_anon_4_task_instance_operator,
> anon_3.anon_4_task_instance_queued_dttm AS
> anon_3_anon_4_task_instance_queued_dttm, anon_3.anon_4_task_instance_pid AS
> anon_3_anon_4_task_instance_pid, anon_3.anon_4_task_instance_executor_config
> AS anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_4.task_instance_try_number AS
> anon_4_task_instance_try_number, anon_4.task_instance_task_id AS
> anon_4_task_instance_task_id, anon_4.task_instance_dag_id AS
> anon_4_task_instance_dag_id, anon_4.task_instance_execution_date AS
> anon_4_task_instance_execution_date, anon_4.task_instance_start_date AS
> anon_4_task_instance_start_date, anon_4.task_instance_end_date AS
> anon_4_task_instance_end_date, anon_4.task_instance_duration AS
> anon_4_task_instance_duration, anon_4.task_instance_state AS
> anon_4_task_instance_state, anon_4.task_instance_max_tries AS
> anon_4_task_instance_max_tries, anon_4.task_instance_hostname AS
> anon_4_task_instance_hostname, anon_4.task_instance_unixname AS
> anon_4_task_instance_unixname, anon_4.task_instance_job_id AS
> anon_4_task_instance_job_id, anon_4.task_instance_pool AS
> anon_4_task_instance_pool, anon_4.task_instance_pool_slots AS
> anon_4_task_instance_pool_slots, anon_4.task_instance_queue AS
> anon_4_task_instance_queue, anon_4.task_instance_priority_weight AS
> anon_4_task_instance_priority_weight, anon_4.task_instance_operator AS
> anon_4_task_instance_operator, anon_4.task_instance_queued_dttm AS
> anon_4_task_instance_queued_dttm, anon_4.task_instance_pid AS
> anon_4_task_instance_pid, anon_4.task_instance_executor_config AS
> anon_4_task_instance_executor_config
> FROM (SELECT task_instance.try_number AS task_instance_try_number,
> task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS
> task_instance_dag_id, task_instance.execution_date AS
> task_instance_execution_date, task_instance.start_date AS
> task_instance_start_date, task_instance.end_date AS task_instance_end_date,
> task_instance.duration AS task_instance_duration, task_instance.state AS
> task_instance_state, task_instance.max_tries AS task_instance_max_tries,
> task_instance.hostname AS task_instance_hostname, task_instance.unixname AS
> task_instance_unixname, task_instance.job_id AS task_instance_job_id,
> task_instance.pool AS task_instance_pool, task_instance.pool_slots AS
> task_instance_pool_slots, task_instance.queue AS task_instance_queue,
> task_instance.priority_weight AS task_instance_priority_weight,
> task_instance.operator AS task_instance_operator, task_instance.queued_dttm
> AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_1)s AND task_instance.task_id IN
> (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s) AND
> task_instance.execution_date >= %(execution_date_1)s AND
> task_instance.execution_date <= %(execution_date_2)s UNION SELECT
> task_instance.try_number AS task_instance_try_number, task_instance.task_id
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
> task_instance.execution_date AS task_instance_execution_date,
> task_instance.start_date AS task_instance_start_date, task_instance.end_date
> AS task_instance_end_date, task_instance.duration AS task_instance_duration,
> task_instance.state AS task_instance_state, task_instance.max_tries AS
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
> task_instance_job_id, task_instance.pool AS task_instance_pool,
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
> task_instance_queue, task_instance.priority_weight AS
> task_instance_priority_weight, task_instance.operator AS
> task_instance_operator, task_instance.queued_dttm AS
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_2)s AND task_instance.task_id IN
> (%(task_id_5)s) AND task_instance.execution_date >= %(execution_date_3)s AND
> task_instance.execution_date <= %(execution_date_4)s) AS anon_4 UNION SELECT
> task_instance.try_number AS task_instance_try_number, task_instance.task_id
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
> task_instance.execution_date AS task_instance_execution_date,
> task_instance.start_date AS task_instance_start_date, task_instance.end_date
> AS task_instance_end_date, task_instance.duration AS task_instance_duration,
> task_instance.state AS task_instance_state, task_instance.max_tries AS
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
> task_instance_job_id, task_instance.pool AS task_instance_pool,
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
> task_instance_queue, task_instance.priority_weight AS
> task_instance_priority_weight, task_instance.operator AS
> task_instance_operator, task_instance.queued_dttm AS
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_3)s AND task_instance.task_id IN
> (%(task_id_6)s) AND task_instance.execution_date >= %(execution_date_5)s AND
> task_instance.execution_date <= %(execution_date_6)s) AS anon_3 UNION SELECT
> task_instance.try_number AS task_instance_try_number, task_instance.task_id
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
> task_instance.execution_date AS task_instance_execution_date,
> task_instance.start_date AS task_instance_start_date, task_instance.end_date
> AS task_instance_end_date, task_instance.duration AS task_instance_duration,
> task_instance.state AS task_instance_state, task_instance.max_tries AS
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname,
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS
> task_instance_job_id, task_instance.pool AS task_instance_pool,
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS
> task_instance_queue, task_instance.priority_weight AS
> task_instance_priority_weight, task_instance.operator AS
> task_instance_operator, task_instance.queued_dttm AS
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid,
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_4)s AND task_instance.task_id IN
> (%(task_id_7)s) AND task_instance.execution_date >= %(execution_date_7)s AND
> task_instance.execution_date <= %(execution_date_8)s) AS anon_2) AS anon_1
> [2020-03-14 09:42:50,265] {base.py:1208} INFO - "\x1b[1m{'dag_id_1':
> 'agg_dag', 'task_id_1': 'start', 'task_id_2': 'daily_tas_0', 'task_id_3':
> 'daily_tas_1', 'task_id_4': 'daily_tas_2', 'execution_date_1':
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")),
> 'execution_date_2': datetime.datetime(2015, 1, 4, 0, 0,
> tzinfo=pendulum.timezone("UTC")), 'dag_id_2': 'daily_dag', 'task_id_5':
> 'daily_tas', 'execution_date_3': datetime.datetime(2015, 1, 3, 0, 0,
> tzinfo=pendulum.timezone("UTC")), 'execution_date_4': datetime.datetime(2015,
> 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_3': 'daily_dag',
> 'task_id_6': 'daily_tas', 'execution_date_5': datetime.datetime(2015, 1, 2,
> 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_6':
> datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")),
> 'dag_id_4': 'daily_dag', 'task_id_7': 'daily_tas', 'execution_date_7':
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")),
> 'execution_date_8': datetime.datetime(2015, 1, 4, 0, 0,
> tzinfo=pendulum.timezone("UTC"))}\x1b[0m"
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)