[AIRFLOW-803] Revert join with dag_runs in _execute_task_instances TaskInstances will be set to 'scheduled' if they meet the criteria to run, also the ones up for retry. No task_instance will be send to the executor in another state than 'scheduled'.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4edf9138 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4edf9138 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4edf9138 Branch: refs/heads/master Commit: 4edf9138d03fa4cbce5a1fc9059735d6f80f80f2 Parents: ac9167f Author: Bolke de Bruin <[email protected]> Authored: Wed Jan 25 11:54:30 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Wed Jan 25 13:29:14 2017 +0100 ---------------------------------------------------------------------- airflow/jobs.py | 6 +--- .../1a5a9e6bf2b5_add_state_index_for_dagruns.py | 37 -------------------- 2 files changed, 1 insertion(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 8bb93bb..3e3229f 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -34,7 +34,7 @@ import time from time import sleep import psutil -from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_ +from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_ from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient from tabulate import tabulate @@ -955,10 +955,6 @@ class SchedulerJob(BaseJob): .query(TI) .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) .filter(TI.state.in_(states)) - .join(DagRun, and_(TI.dag_id == DagRun.dag_id, - TI.execution_date == DagRun.execution_date, - DagRun.state == State.RUNNING, - DagRun.run_id.like(DagRun.ID_PREFIX + '%'))) .all() ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4edf9138/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py b/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py deleted file mode 100644 index 29ffaf1..0000000 --- a/airflow/migrations/versions/1a5a9e6bf2b5_add_state_index_for_dagruns.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Add state index for dagruns to allow the quick lookup of active dagruns - -Revision ID: 1a5a9e6bf2b5 -Revises: 5e7d17757c7a -Create Date: 2017-01-17 10:22:53.193711 - -""" - -# revision identifiers, used by Alembic. -revision = '1a5a9e6bf2b5' -down_revision = '5e7d17757c7a' -branch_labels = None -depends_on = None - -from alembic import op -import sqlalchemy as sa - - -def upgrade(): - op.create_index('dr_state', 'dag_run', ['state'], unique=False) - - -def downgrade(): - op.drop_index('state', table_name='dag_run')
