Repository: incubator-airflow Updated Branches: refs/heads/master 5479ac8d4 -> 6b2a3ca2e
[AIRFLOW-807] Improve scheduler performance for large DAGs MySQL's query optimizer selects the wrong index, this has a significant impact on the performance of the scheduler. Closes #2021 from criccomini/AIRFLOW-807 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6b2a3ca2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6b2a3ca2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6b2a3ca2 Branch: refs/heads/master Commit: 6b2a3ca2ee4ee3415ef72ea1fa3fc694350e9efc Parents: 5479ac8 Author: Chris Riccomini <[email protected]> Authored: Wed Jan 25 22:54:09 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Wed Jan 25 22:54:18 2017 +0100 ---------------------------------------------------------------------- airflow/jobs.py | 6 ++++ ...7_add_dag_id_state_index_on_dag_run_table.py | 37 ++++++++++++++++++++ 2 files changed, 43 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 0ac3607..201d87f 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -538,12 +538,18 @@ class SchedulerJob(BaseJob): Where assuming that the scheduler runs often, so we only check for tasks that should have succeeded in the past hour. """ + if not any([ti.sla for ti in dag.tasks]): + self.logger.info("Skipping SLA check for {} because " + "no tasks in DAG have SLAs".format(dag)) + return + TI = models.TaskInstance sq = ( session .query( TI.task_id, func.max(TI.execution_date).label('max_ti')) + .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql') .filter(TI.dag_id == dag.dag_id) .filter(TI.state == State.SUCCESS) .filter(TI.task_id.in_(dag.task_ids)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6b2a3ca2/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py new file mode 100644 index 0000000..c500966 --- /dev/null +++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py @@ -0,0 +1,37 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add dag_id/state index on dag_run table + +Revision ID: 127d2bf2dfa7 +Revises: 1a5a9e6bf2b5 +Create Date: 2017-01-25 11:43:51.635667 + +""" + +# revision identifiers, used by Alembic. +revision = '127d2bf2dfa7' +down_revision = '1a5a9e6bf2b5' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa + +def upgrade(): + op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False) + + +def downgrade(): + op.drop_index('dag_id_state', table_name='dag_run') +
