Repository: incubator-airflow Updated Branches: refs/heads/master 21c142051 -> 4764646b1
[AIRFLOW-1166] Speed up _change_state_for_tis_without_dagrun _change_state_for_tis_without_dagrun was locking a significant amount of tasks uncessarily. This could end up in a deadlock in the database due to the time the lock stood. Closes #2267 from bolkedebruin/fix_deadlock Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4764646b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4764646b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4764646b Branch: refs/heads/master Commit: 4764646b18f56c34a35c19bd20a1931eb3a844fe Parents: 21c1420 Author: Bolke de Bruin <[email protected]> Authored: Wed Jun 7 09:16:51 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Jun 7 09:16:51 2017 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 76 +++++++++++++++++++++++++--------------------------- tests/jobs.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4764646b/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index adc4328..508564a 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -35,7 +35,8 @@ import time from time import sleep import psutil -from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_ +from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_ +from sqlalchemy import update from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient from tabulate import tabulate @@ -931,45 +932,42 @@ class SchedulerJob(BaseJob): simple_dag_bag and with states in the old_state will be examined :type simple_dag_bag: SimpleDagBag """ + tis_changed = 0 + if self.using_sqlite: + tis_to_change = ( + session + .query(models.TaskInstance) + .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) + .filter(models.TaskInstance.state.in_(old_states)) + .filter(and_( + models.DagRun.dag_id == models.TaskInstance.dag_id, + models.DagRun.execution_date == models.TaskInstance.execution_date, + models.DagRun.state != State.RUNNING)) + .with_for_update() + .all() + ) + for ti in tis_to_change: + ti.set_state(new_state, session=session) + tis_changed += 1 + else: + tis_changed = ( + session + .query(models.TaskInstance) + .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) + .filter(models.TaskInstance.state.in_(old_states)) + .filter(and_( + models.DagRun.dag_id == models.TaskInstance.dag_id, + models.DagRun.execution_date == models.TaskInstance.execution_date, + models.DagRun.state != State.RUNNING)) + .update({models.TaskInstance.state: new_state}, + synchronize_session=False) + ) + session.commit() - task_instances_to_change = ( - session - .query(models.TaskInstance) - .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) - .filter(models.TaskInstance.state.in_(old_states)) - .with_for_update() - .all() - ) - """:type: list[TaskInstance]""" - - for task_instance in task_instances_to_change: - dag_runs = DagRun.find(dag_id=task_instance.dag_id, - execution_date=task_instance.execution_date, - ) - - if len(dag_runs) == 0: - self.logger.warning("DagRun for %s %s does not exist", - task_instance.dag_id, - task_instance.execution_date) - continue - - # There should only be one DAG run. Add some logging info if this - # is not the case for later debugging. - if len(dag_runs) > 1: - self.logger.warning("Multiple DagRuns found for {} {}: {}" - .format(task_instance.dag_id, - task_instance.execution_date, - dag_runs)) - - if not any(dag_run.state == State.RUNNING for dag_run in dag_runs): - self.logger.warning("Setting {} to state={} as it does not have " - "a DagRun in the {} state" - .format(task_instance, - new_state, - State.RUNNING)) - task_instance.state = new_state - session.merge(task_instance) - session.commit() + if tis_changed > 0: + self.logger.warning("Set {} task instances to state={} as their associated " + "DagRun was not in RUNNING state".format( + tis_changed, new_state)) @provide_session def _execute_task_instances(self, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4764646b/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index b0763b9..fb6fcdd 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -613,6 +613,77 @@ class SchedulerJobTest(unittest.TestCase): session.close() + def test_change_state_for_tis_without_dagrun(self): + dag = DAG( + dag_id='test_change_state_for_tis_without_dagrun', + start_date=DEFAULT_DATE) + + DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow') + + dag2 = DAG( + dag_id='test_change_state_for_tis_without_dagrun_dont_change', + start_date=DEFAULT_DATE) + + DummyOperator( + task_id='dummy', + dag=dag2, + owner='airflow') + + session = settings.Session() + dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) + + dr2 = dag2.create_dagrun(run_id=DagRun.ID_PREFIX, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) + + ti = dr.get_task_instance(task_id='dummy', session=session) + ti.state = State.SCHEDULED + session.commit() + + ti2 = dr2.get_task_instance(task_id='dummy', session=session) + ti2.state = State.SCHEDULED + session.commit() + + dagbag = SimpleDagBag([dag]) + scheduler = SchedulerJob(num_runs=0, run_duration=0) + scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag, + old_states=[State.SCHEDULED, State.QUEUED], + new_state=State.NONE, + session=session) + + ti.refresh_from_db(session=session) + self.assertEqual(ti.state, State.SCHEDULED) + + ti2.refresh_from_db(session=session) + self.assertEqual(ti2.state, State.SCHEDULED) + + dr.refresh_from_db(session=session) + dr.state = State.FAILED + + # why o why + session.merge(dr) + session.commit() + + scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag, + old_states=[State.SCHEDULED, State.QUEUED], + new_state=State.NONE, + session=session) + ti.refresh_from_db(session=session) + self.assertEqual(ti.state, State.NONE) + + # don't touch ti2 + ti2.refresh_from_db(session=session) + self.assertEqual(ti2.state, State.SCHEDULED) + def test_execute_helper_reset_orphaned_tasks(self): session = settings.Session() dag = DAG(
