[ 
https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335724#comment-15335724
 ] 

Kengo Seki commented on AIRFLOW-246:
------------------------------------

Multiple left outer joins seem to affect performance. I think we can rewrite 
the query in question by replacing left outer join with inner join and union, 
such as:

{code:sql}
SELECT
  dag_id AS task_instance_dag_id,
  state AS task_instance_state,
  count(*) as count_1
FROM (
  SELECT
    task_instance.dag_id,
    task_instance.state
  FROM
    task_instance
  JOIN (
    SELECT
      dag_run.dag_id AS dag_id,
      dag_run.execution_date AS execution_date
    FROM
      dag_run
    WHERE
      dag_run.state = 'running'
  ) AS running_dag_run
  ON
    running_dag_run.dag_id = task_instance.dag_id
  AND
    running_dag_run.execution_date = task_instance.execution_date
  WHERE
    task_id IN ...

  UNION ALL

  SELECT
    task_instance.dag_id,
    task_instance.state
  FROM
    task_instance
  JOIN (
    SELECT
      dag_run.dag_id AS dag_id,
      max(dag_run.execution_date) AS execution_date
    FROM
      dag_run
    GROUP BY
      dag_run.dag_id
  ) AS last_dag_run
  ON
    last_dag_run.dag_id = task_instance.dag_id
  AND
    last_dag_run.execution_date = task_instance.execution_date
  WHERE
    task_id IN ...
) t
GROUP BY
  dag_id,
  state;
{code}

I compared these queries with some dummy data, and got x3-4 improvement.

{code}
mysql> select count(*) from dag_run;
+----------+
| count(*) |
+----------+
|     3417 |
+----------+
1 row in set (0.00 sec)

mysql> select count(*) from task_instance;
+----------+
| count(*) |
+----------+
|   229089 |
+----------+
1 row in set (0.00 sec)

mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state 
AS task_instance_state, count(task_instance.task_id) AS count_1 FROM 
task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, 
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 
'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id 
AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER 
JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS 
execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON 
last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = 
task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 
'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 
'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND 
(running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP 
BY task_instance.dag_id, task_instance.state; 
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.39 sec)

mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state, 
count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state 
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, 
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 
'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id 
AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id 
IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 
'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 
'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 
'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 
'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 
'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 
'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 
'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 
'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 
'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 
'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 
'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 
'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 
'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state 
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, 
max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY 
dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id 
AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN 
('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 
'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id, 
state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.11 sec)

mysql> -- swap execution order, just in case
mysql> reset query cache;
Query OK, 0 rows affected (0.00 sec)

mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state, 
count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state 
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, 
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 
'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id 
AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id 
IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 
'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 
'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 
'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 
'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 
'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 
'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 
'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 
'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 
'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 
'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 
'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 
'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 
'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state 
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, 
max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY 
dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id 
AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN 
('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 
'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id, 
state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.13 sec)

mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state 
AS task_instance_state, count(task_instance.task_id) AS count_1 FROM 
task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, 
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 
'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id 
AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER 
JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS 
execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON 
last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = 
task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 
'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 
'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND 
(running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP 
BY task_instance.dag_id, task_instance.state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id                    | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator                   | success             |       6 |
| example_branch_dop_operator_v3          | NULL                |       3 |
| example_branch_operator                 | skipped             |       6 |
| example_branch_operator                 | success             |       5 |
| example_http_operator                   | failed              |       1 |
| example_http_operator                   | upstream_failed     |       5 |
| example_passing_params_via_test_command | success             |       2 |
| example_short_circuit_operator          | skipped             |       2 |
| example_short_circuit_operator          | success             |       4 |
| example_skip_dag                        | skipped             |       4 |
| example_skip_dag                        | success             |       4 |
| example_subdag_operator                 | success             |       5 |
| example_trigger_controller_dag          | success             |       1 |
| example_trigger_target_dag              | success             |       2 |
| example_xcom                            | success             |       3 |
| tutorial                                | success             |       3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.40 sec)
{code}

[~nhanlon] Could you try the new query to confirm whether it is effective in 
your situation? If it works, I'll try to fix the sqlalchemy code.

> dag_stats endpoint has a terrible query
> ---------------------------------------
>
>                 Key: AIRFLOW-246
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-246
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: webserver
>    Affects Versions: Airflow 1.7.1
>         Environment: MySQL Backend through sqlalchemy
>            Reporter: Neil Hanlon
>
> Hitting this endpoint creates a series of queries on the database which take 
> over 20 seconds to run, causing the page to not load for that entire time. 
> Luckily the main page (which includes this under "Recent Statuses") loads 
> this synchronously, but still... waiting almost half a minute (at times more) 
> to see the statuses for dags is really not fun.
> We have less than a million rows in the task_instance table--so it's not even 
> a problem with that.
> Here's a query profile for the query:
> https://gist.github.com/NeilHanlon/613f12724e802bc51c23fca7d46d28bf
> We've done some optimizations on the database, but to no avail.
> The query:
> {code:sql}
> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS 
> task_instance_state, count(task_instance.task_id) AS count_1 FROM 
> task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, 
> dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 
> 'running') AS running_dag_run ON running_dag_run.dag_id = 
> task_instance.dag_id AND running_dag_run.execution_date = 
> task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS 
> dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY 
> dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id 
> AND last_dag_run.execution_date = task_instance.execution_date WHERE 
> task_instance.task_id IN ... AND (running_dag_run.dag_id IS NOT NULL OR 
> last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, 
> task_instance.state;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to