[ 
https://issues.apache.org/jira/browse/AIRFLOW-7063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17059319#comment-17059319
 ] 

Qian Yu commented on AIRFLOW-7063:
----------------------------------

Here's a self-contained test to reproduce this slowness. 
{code:python}
def test_clear_multiple_external_task_marker():
    dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)
    daily_dag = DAG("daily_dag", start_date=DEFAULT_DATE, 
schedule_interval="@daily")
    agg_dag = DAG("agg_dag", start_date=DEFAULT_DATE, 
schedule_interval="@daily")
    dag_bag.bag_dag(daily_dag, None, daily_dag)
    dag_bag.bag_dag(agg_dag, None, agg_dag)

    daily_task = DummyOperator(task_id="daily_tas", dag=daily_dag)

    start = DummyOperator(task_id="start", dag=agg_dag)
    number_union = 20
    for i in range(number_union):
        task = ExternalTaskMarker(task_id=f"{daily_task.task_id}_{i}",
                                  external_dag_id=daily_dag.dag_id,
                                  external_task_id=daily_task.task_id,
                                  execution_date="{{ macros.ds_add(ds, -1 * %s) 
}}" % i,
                                  dag=agg_dag)
        start >> task

    for delta in range(len(agg_dag.tasks)):
        execution_date = DEFAULT_DATE + timedelta(days=delta)
        for dag in dag_bag.dags.values():
            for task in dag.tasks:
                ti = TaskInstance(task=task, execution_date=execution_date)
                ti.run()

    agg_dag.clear(start_date=execution_date, end_date=execution_date, 
dag_bag=dag_bag)
{code}

This is the time it takes to call {{dag.clear()}} before and after 
{{tis.count()}} is removed:

With {{tis.count()}}, it gets much slower when more {{ExternalTaskMarker}} are 
used:
{code}
number_union: 1, dag.clear() takes: 0.05681347846984863s
number_union: 2, dag.clear() takes: 0.06906294822692871s
number_union: 3, dag.clear() takes: 0.0995175838470459s
number_union: 4, dag.clear() takes: 0.1320326328277588s
number_union: 5, dag.clear() takes: 0.15970063209533691s
number_union: 6, dag.clear() takes: 0.19741106033325195s
number_union: 7, dag.clear() takes: 0.22883057594299316s
number_union: 8, dag.clear() takes: 0.2697880268096924s
number_union: 9, dag.clear() takes: 0.29255032539367676s
number_union: 10, dag.clear() takes: 0.33300089836120605s
number_union: 11, dag.clear() takes: 0.37502217292785645s
number_union: 12, dag.clear() takes: 0.4150240421295166s
number_union: 13, dag.clear() takes: 0.5335328578948975s
number_union: 14, dag.clear() takes: 0.5695874691009521s
number_union: 15, dag.clear() takes: 0.652968168258667s
number_union: 16, dag.clear() takes: 0.951585054397583s
number_union: 17, dag.clear() takes: 1.356891393661499s
number_union: 18, dag.clear() takes: 1.9704437255859375s
number_union: 19, dag.clear() takes: 3.2505640983581543s
number_union: 20, dag.clear() takes: 5.810389757156372s
number_union: 21, dag.clear() takes: 11.054765701293945s
number_union: 22, dag.clear() takes: 24.731156826019287s
number_union: 23, dag.clear() takes: 41.684504985809326s
number_union: 24, dag.clear() takes: 82.07931756973267s
number_union: 25, dag.clear() takes: 164.3687846660614s
{code} 

After replacing {{tis.count()}} with {{len(tis.all()):}}. Performance of 
{{dag.clear()}} is much more reasonable and about linear wrt the number of 
{{ExternalTaskMarker}}:
{code}
number_union: 1, dag.clear() takes: 0.04764533042907715s
number_union: 2, dag.clear() takes: 0.06222081184387207s
number_union: 3, dag.clear() takes: 0.08550524711608887s
number_union: 4, dag.clear() takes: 0.1136934757232666s
number_union: 5, dag.clear() takes: 0.14034652709960938s
number_union: 6, dag.clear() takes: 0.18161964416503906s
number_union: 7, dag.clear() takes: 0.20060372352600098s
number_union: 8, dag.clear() takes: 0.24526739120483398s
number_union: 9, dag.clear() takes: 0.2750375270843506s
number_union: 10, dag.clear() takes: 0.29474949836730957s
number_union: 11, dag.clear() takes: 0.37499213218688965s
number_union: 12, dag.clear() takes: 0.35225915908813477s
number_union: 13, dag.clear() takes: 0.3956146240234375s
number_union: 14, dag.clear() takes: 0.45566248893737793s
number_union: 15, dag.clear() takes: 0.45056653022766113s
number_union: 16, dag.clear() takes: 0.5372872352600098s
number_union: 17, dag.clear() takes: 0.5812375545501709s
number_union: 18, dag.clear() takes: 0.5535945892333984s
number_union: 19, dag.clear() takes: 0.6553409099578857s
number_union: 20, dag.clear() takes: 0.7062091827392578s
number_union: 21, dag.clear() takes: 0.7340657711029053s
number_union: 22, dag.clear() takes: 0.7563188076019287s
number_union: 23, dag.clear() takes: 0.826127290725708s
number_union: 24, dag.clear() takes: 0.8763349056243896s
number_union: 25, dag.clear() takes: 0.903679370880127s
{code}

This example is using {{ExternalTaskMarker}}, I believe the same slowness will 
happen if a lot of nested {{SubDagOperator}} are used too (although nesting 
many levels of {{SubDagOperator}} makes less sense)


> dag.clear() slowness caused by multiple UNION statements and tis.count()
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-7063
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-7063
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: webserver
>    Affects Versions: 1.10.9
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>
> When multiple {{ExternalTaskMarker}} are used, {{dag.clear()}} becomes very 
> slow when clearing all the {{ExternalTaskMarker}} together. The slowness 
> turns out to come from this line of code in {{dag.clear()}}:
> {code:python}
>         if dry_run:
>             tis = tis.all()
>             session.expunge_all()
>             return tis
>         count = tis.count()   <------- This line is the culprit
>         do_it = True
>         if count == 0:
>             return 0
> {code}
> This is the sql generated by {{tis.count()}} when there are three 
> {{ExternalTaskMarker}} being cleared together. Note there's nothing wrong 
> with the sql and it's reasonably efficient when executed on postgres even 
> when the number of UNION statements is bigger (e.g. 30 UNION statements takes 
> about 13ms in the docker container I started with breeze)
>  But it takes more than three minutes for sqlalchemy to construct this count 
> query before it goes to the database.
> The fix is really simple, just get rid of the count() and query all the 
> entries from the db instead. The function becomes ten times faster when 
> {{tis.count()}} is removed.
>  There are multiple places people are complaining about similar problems with 
> sqlalchemy count() being slower than the query itself. It does not look like 
> sqlalchemy is going to fix this issue:
>  
> [https://stackoverflow.com/questions/14754994/why-is-sqlalchemy-count-much-slower-than-the-raw-query]
>  [https://gist.github.com/hest/8798884]
>  
> {code:sql}
> [2020-03-14 09:42:50,264] {base.py:1203} INFO - SELECT count(*) AS count_1
> FROM (SELECT anon_2.anon_3_anon_4_task_instance_try_number AS 
> anon_2_anon_3_anon_4_task_instance_try_number, 
> anon_2.anon_3_anon_4_task_instance_task_id AS 
> anon_2_anon_3_anon_4_task_instance_task_id, 
> anon_2.anon_3_anon_4_task_instance_dag_id AS 
> anon_2_anon_3_anon_4_task_instance_dag_id, 
> anon_2.anon_3_anon_4_task_instance_execution_date AS 
> anon_2_anon_3_anon_4_task_instance_execution_date, 
> anon_2.anon_3_anon_4_task_instance_start_date AS 
> anon_2_anon_3_anon_4_task_instance_start_date, 
> anon_2.anon_3_anon_4_task_instance_end_date AS 
> anon_2_anon_3_anon_4_task_instance_end_date, 
> anon_2.anon_3_anon_4_task_instance_duration AS 
> anon_2_anon_3_anon_4_task_instance_duration, 
> anon_2.anon_3_anon_4_task_instance_state AS 
> anon_2_anon_3_anon_4_task_instance_state, 
> anon_2.anon_3_anon_4_task_instance_max_tries AS 
> anon_2_anon_3_anon_4_task_instance_max_tries, 
> anon_2.anon_3_anon_4_task_instance_hostname AS 
> anon_2_anon_3_anon_4_task_instance_hostname, 
> anon_2.anon_3_anon_4_task_instance_unixname AS 
> anon_2_anon_3_anon_4_task_instance_unixname, 
> anon_2.anon_3_anon_4_task_instance_job_id AS 
> anon_2_anon_3_anon_4_task_instance_job_id, 
> anon_2.anon_3_anon_4_task_instance_pool AS 
> anon_2_anon_3_anon_4_task_instance_pool, 
> anon_2.anon_3_anon_4_task_instance_pool_slots AS 
> anon_2_anon_3_anon_4_task_instance_pool_slots, 
> anon_2.anon_3_anon_4_task_instance_queue AS 
> anon_2_anon_3_anon_4_task_instance_queue, 
> anon_2.anon_3_anon_4_task_instance_priority_weight AS 
> anon_2_anon_3_anon_4_task_instance_priority_weight, 
> anon_2.anon_3_anon_4_task_instance_operator AS 
> anon_2_anon_3_anon_4_task_instance_operator, 
> anon_2.anon_3_anon_4_task_instance_queued_dttm AS 
> anon_2_anon_3_anon_4_task_instance_queued_dttm, 
> anon_2.anon_3_anon_4_task_instance_pid AS 
> anon_2_anon_3_anon_4_task_instance_pid, 
> anon_2.anon_3_anon_4_task_instance_executor_config AS 
> anon_2_anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_3.anon_4_task_instance_try_number AS 
> anon_3_anon_4_task_instance_try_number, anon_3.anon_4_task_instance_task_id 
> AS anon_3_anon_4_task_instance_task_id, anon_3.anon_4_task_instance_dag_id AS 
> anon_3_anon_4_task_instance_dag_id, 
> anon_3.anon_4_task_instance_execution_date AS 
> anon_3_anon_4_task_instance_execution_date, 
> anon_3.anon_4_task_instance_start_date AS 
> anon_3_anon_4_task_instance_start_date, anon_3.anon_4_task_instance_end_date 
> AS anon_3_anon_4_task_instance_end_date, anon_3.anon_4_task_instance_duration 
> AS anon_3_anon_4_task_instance_duration, anon_3.anon_4_task_instance_state AS 
> anon_3_anon_4_task_instance_state, anon_3.anon_4_task_instance_max_tries AS 
> anon_3_anon_4_task_instance_max_tries, anon_3.anon_4_task_instance_hostname 
> AS anon_3_anon_4_task_instance_hostname, anon_3.anon_4_task_instance_unixname 
> AS anon_3_anon_4_task_instance_unixname, anon_3.anon_4_task_instance_job_id 
> AS anon_3_anon_4_task_instance_job_id, anon_3.anon_4_task_instance_pool AS 
> anon_3_anon_4_task_instance_pool, anon_3.anon_4_task_instance_pool_slots AS 
> anon_3_anon_4_task_instance_pool_slots, anon_3.anon_4_task_instance_queue AS 
> anon_3_anon_4_task_instance_queue, 
> anon_3.anon_4_task_instance_priority_weight AS 
> anon_3_anon_4_task_instance_priority_weight, 
> anon_3.anon_4_task_instance_operator AS anon_3_anon_4_task_instance_operator, 
> anon_3.anon_4_task_instance_queued_dttm AS 
> anon_3_anon_4_task_instance_queued_dttm, anon_3.anon_4_task_instance_pid AS 
> anon_3_anon_4_task_instance_pid, anon_3.anon_4_task_instance_executor_config 
> AS anon_3_anon_4_task_instance_executor_config
> FROM (SELECT anon_4.task_instance_try_number AS 
> anon_4_task_instance_try_number, anon_4.task_instance_task_id AS 
> anon_4_task_instance_task_id, anon_4.task_instance_dag_id AS 
> anon_4_task_instance_dag_id, anon_4.task_instance_execution_date AS 
> anon_4_task_instance_execution_date, anon_4.task_instance_start_date AS 
> anon_4_task_instance_start_date, anon_4.task_instance_end_date AS 
> anon_4_task_instance_end_date, anon_4.task_instance_duration AS 
> anon_4_task_instance_duration, anon_4.task_instance_state AS 
> anon_4_task_instance_state, anon_4.task_instance_max_tries AS 
> anon_4_task_instance_max_tries, anon_4.task_instance_hostname AS 
> anon_4_task_instance_hostname, anon_4.task_instance_unixname AS 
> anon_4_task_instance_unixname, anon_4.task_instance_job_id AS 
> anon_4_task_instance_job_id, anon_4.task_instance_pool AS 
> anon_4_task_instance_pool, anon_4.task_instance_pool_slots AS 
> anon_4_task_instance_pool_slots, anon_4.task_instance_queue AS 
> anon_4_task_instance_queue, anon_4.task_instance_priority_weight AS 
> anon_4_task_instance_priority_weight, anon_4.task_instance_operator AS 
> anon_4_task_instance_operator, anon_4.task_instance_queued_dttm AS 
> anon_4_task_instance_queued_dttm, anon_4.task_instance_pid AS 
> anon_4_task_instance_pid, anon_4.task_instance_executor_config AS 
> anon_4_task_instance_executor_config
> FROM (SELECT task_instance.try_number AS task_instance_try_number, 
> task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
> task_instance_dag_id, task_instance.execution_date AS 
> task_instance_execution_date, task_instance.start_date AS 
> task_instance_start_date, task_instance.end_date AS task_instance_end_date, 
> task_instance.duration AS task_instance_duration, task_instance.state AS 
> task_instance_state, task_instance.max_tries AS task_instance_max_tries, 
> task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
> task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
> task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
> task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
> task_instance.priority_weight AS task_instance_priority_weight, 
> task_instance.operator AS task_instance_operator, task_instance.queued_dttm 
> AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_1)s AND task_instance.task_id IN 
> (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s) AND 
> task_instance.execution_date >= %(execution_date_1)s AND 
> task_instance.execution_date <= %(execution_date_2)s UNION SELECT 
> task_instance.try_number AS task_instance_try_number, task_instance.task_id 
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
> task_instance.execution_date AS task_instance_execution_date, 
> task_instance.start_date AS task_instance_start_date, task_instance.end_date 
> AS task_instance_end_date, task_instance.duration AS task_instance_duration, 
> task_instance.state AS task_instance_state, task_instance.max_tries AS 
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
> task_instance_job_id, task_instance.pool AS task_instance_pool, 
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
> task_instance_queue, task_instance.priority_weight AS 
> task_instance_priority_weight, task_instance.operator AS 
> task_instance_operator, task_instance.queued_dttm AS 
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_2)s AND task_instance.task_id IN 
> (%(task_id_5)s) AND task_instance.execution_date >= %(execution_date_3)s AND 
> task_instance.execution_date <= %(execution_date_4)s) AS anon_4 UNION SELECT 
> task_instance.try_number AS task_instance_try_number, task_instance.task_id 
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
> task_instance.execution_date AS task_instance_execution_date, 
> task_instance.start_date AS task_instance_start_date, task_instance.end_date 
> AS task_instance_end_date, task_instance.duration AS task_instance_duration, 
> task_instance.state AS task_instance_state, task_instance.max_tries AS 
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
> task_instance_job_id, task_instance.pool AS task_instance_pool, 
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
> task_instance_queue, task_instance.priority_weight AS 
> task_instance_priority_weight, task_instance.operator AS 
> task_instance_operator, task_instance.queued_dttm AS 
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_3)s AND task_instance.task_id IN 
> (%(task_id_6)s) AND task_instance.execution_date >= %(execution_date_5)s AND 
> task_instance.execution_date <= %(execution_date_6)s) AS anon_3 UNION SELECT 
> task_instance.try_number AS task_instance_try_number, task_instance.task_id 
> AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
> task_instance.execution_date AS task_instance_execution_date, 
> task_instance.start_date AS task_instance_start_date, task_instance.end_date 
> AS task_instance_end_date, task_instance.duration AS task_instance_duration, 
> task_instance.state AS task_instance_state, task_instance.max_tries AS 
> task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
> task_instance_job_id, task_instance.pool AS task_instance_pool, 
> task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
> task_instance_queue, task_instance.priority_weight AS 
> task_instance_priority_weight, task_instance.operator AS 
> task_instance_operator, task_instance.queued_dttm AS 
> task_instance_queued_dttm, task_instance.pid AS task_instance_pid, 
> task_instance.executor_config AS task_instance_executor_config
> FROM task_instance
> WHERE task_instance.dag_id LIKE %(dag_id_4)s AND task_instance.task_id IN 
> (%(task_id_7)s) AND task_instance.execution_date >= %(execution_date_7)s AND 
> task_instance.execution_date <= %(execution_date_8)s) AS anon_2) AS anon_1
> [2020-03-14 09:42:50,265] {base.py:1208} INFO - "\x1b[1m{'dag_id_1': 
> 'agg_dag', 'task_id_1': 'start', 'task_id_2': 'daily_tas_0', 'task_id_3': 
> 'daily_tas_1', 'task_id_4': 'daily_tas_2', 'execution_date_1': 
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 
> 'execution_date_2': datetime.datetime(2015, 1, 4, 0, 0, 
> tzinfo=pendulum.timezone("UTC")), 'dag_id_2': 'daily_dag', 'task_id_5': 
> 'daily_tas', 'execution_date_3': datetime.datetime(2015, 1, 3, 0, 0, 
> tzinfo=pendulum.timezone("UTC")), 'execution_date_4': datetime.datetime(2015, 
> 1, 3, 0, 0, tzinfo=pendulum.timezone("UTC")), 'dag_id_3': 'daily_dag', 
> 'task_id_6': 'daily_tas', 'execution_date_5': datetime.datetime(2015, 1, 2, 
> 0, 0, tzinfo=pendulum.timezone("UTC")), 'execution_date_6': 
> datetime.datetime(2015, 1, 2, 0, 0, tzinfo=pendulum.timezone("UTC")), 
> 'dag_id_4': 'daily_dag', 'task_id_7': 'daily_tas', 'execution_date_7': 
> datetime.datetime(2015, 1, 4, 0, 0, tzinfo=pendulum.timezone("UTC")), 
> 'execution_date_8': datetime.datetime(2015, 1, 4, 0, 0, 
> tzinfo=pendulum.timezone("UTC"))}\x1b[0m"
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to