[
https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335724#comment-15335724
]
Kengo Seki commented on AIRFLOW-246:
------------------------------------
Multiple left outer joins seem to affect performance. I think we can rewrite
the query in question by replacing left outer join with inner join and union,
such as:
{code:sql}
SELECT
dag_id AS task_instance_dag_id,
state AS task_instance_state,
count(*) as count_1
FROM (
SELECT
task_instance.dag_id,
task_instance.state
FROM
task_instance
JOIN (
SELECT
dag_run.dag_id AS dag_id,
dag_run.execution_date AS execution_date
FROM
dag_run
WHERE
dag_run.state = 'running'
) AS running_dag_run
ON
running_dag_run.dag_id = task_instance.dag_id
AND
running_dag_run.execution_date = task_instance.execution_date
WHERE
task_id IN ...
UNION ALL
SELECT
task_instance.dag_id,
task_instance.state
FROM
task_instance
JOIN (
SELECT
dag_run.dag_id AS dag_id,
max(dag_run.execution_date) AS execution_date
FROM
dag_run
GROUP BY
dag_run.dag_id
) AS last_dag_run
ON
last_dag_run.dag_id = task_instance.dag_id
AND
last_dag_run.execution_date = task_instance.execution_date
WHERE
task_id IN ...
) t
GROUP BY
dag_id,
state;
{code}
I compared these queries with some dummy data, and got x3-4 improvement.
{code}
mysql> select count(*) from dag_run;
+----------+
| count(*) |
+----------+
| 3417 |
+----------+
1 row in set (0.00 sec)
mysql> select count(*) from task_instance;
+----------+
| count(*) |
+----------+
| 229089 |
+----------+
1 row in set (0.00 sec)
mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state
AS task_instance_state, count(task_instance.task_id) AS count_1 FROM
task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id,
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state =
'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id
AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER
JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS
execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON
last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date =
task_instance.execution_date WHERE task_instance.task_id IN ('all_success',
'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching',
'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition',
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1',
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b',
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join',
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date',
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1',
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last',
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3',
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1',
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5',
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start',
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND
(running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP
BY task_instance.dag_id, task_instance.state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.39 sec)
mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state,
count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id,
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state =
'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id
AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id
IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2',
'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d',
'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end',
'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a',
'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op',
'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op',
'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning',
'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this',
'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1',
'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5',
'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3',
'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2',
'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun',
'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id,
max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY
dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id
AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN
('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task',
'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition',
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1',
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b',
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join',
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date',
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1',
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last',
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3',
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1',
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5',
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start',
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id,
state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.11 sec)
mysql> -- swap execution order, just in case
mysql> reset query cache;
Query OK, 0 rows affected (0.00 sec)
mysql> SELECT dag_id AS task_instance_dag_id, state AS task_instance_state,
count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id,
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state =
'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id
AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id
IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2',
'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d',
'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end',
'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a',
'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op',
'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op',
'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning',
'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this',
'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1',
'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5',
'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3',
'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2',
'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun',
'true_1', 'true_2') UNION ALL SELECT task_instance.dag_id, task_instance.state
FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id,
max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY
dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id
AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN
('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task',
'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition',
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1',
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b',
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join',
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date',
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1',
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last',
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3',
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1',
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5',
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start',
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') ) t GROUP BY dag_id,
state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.13 sec)
mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state
AS task_instance_state, count(task_instance.task_id) AS count_1 FROM
task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id,
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state =
'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id
AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER
JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS
execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON
last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date =
task_instance.execution_date WHERE task_instance.task_id IN ('all_success',
'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching',
'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition',
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1',
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b',
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join',
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date',
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1',
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last',
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3',
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1',
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5',
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start',
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND
(running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP
BY task_instance.dag_id, task_instance.state;
+-----------------------------------------+---------------------+---------+
| task_instance_dag_id | task_instance_state | count_1 |
+-----------------------------------------+---------------------+---------+
| example_bash_operator | success | 6 |
| example_branch_dop_operator_v3 | NULL | 3 |
| example_branch_operator | skipped | 6 |
| example_branch_operator | success | 5 |
| example_http_operator | failed | 1 |
| example_http_operator | upstream_failed | 5 |
| example_passing_params_via_test_command | success | 2 |
| example_short_circuit_operator | skipped | 2 |
| example_short_circuit_operator | success | 4 |
| example_skip_dag | skipped | 4 |
| example_skip_dag | success | 4 |
| example_subdag_operator | success | 5 |
| example_trigger_controller_dag | success | 1 |
| example_trigger_target_dag | success | 2 |
| example_xcom | success | 3 |
| tutorial | success | 3 |
+-----------------------------------------+---------------------+---------+
16 rows in set (0.40 sec)
{code}
[~nhanlon] Could you try the new query to confirm whether it is effective in
your situation? If it works, I'll try to fix the sqlalchemy code.
> dag_stats endpoint has a terrible query
> ---------------------------------------
>
> Key: AIRFLOW-246
> URL: https://issues.apache.org/jira/browse/AIRFLOW-246
> Project: Apache Airflow
> Issue Type: Bug
> Components: webserver
> Affects Versions: Airflow 1.7.1
> Environment: MySQL Backend through sqlalchemy
> Reporter: Neil Hanlon
>
> Hitting this endpoint creates a series of queries on the database which take
> over 20 seconds to run, causing the page to not load for that entire time.
> Luckily the main page (which includes this under "Recent Statuses") loads
> this synchronously, but still... waiting almost half a minute (at times more)
> to see the statuses for dags is really not fun.
> We have less than a million rows in the task_instance table--so it's not even
> a problem with that.
> Here's a query profile for the query:
> https://gist.github.com/NeilHanlon/613f12724e802bc51c23fca7d46d28bf
> We've done some optimizations on the database, but to no avail.
> The query:
> {code:sql}
> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS
> task_instance_state, count(task_instance.task_id) AS count_1 FROM
> task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id,
> dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state =
> 'running') AS running_dag_run ON running_dag_run.dag_id =
> task_instance.dag_id AND running_dag_run.execution_date =
> task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS
> dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY
> dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id
> AND last_dag_run.execution_date = task_instance.execution_date WHERE
> task_instance.task_id IN ... AND (running_dag_run.dag_id IS NOT NULL OR
> last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id,
> task_instance.state;
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)