Repository: incubator-airflow Updated Branches: refs/heads/master 28aeed4aa -> e05d3b4df
[AIRFLOW-1059] Reset orphaned tasks in batch for scheduler The current implementation resets state for tasks 1 dagrun at a time. We should be able to do this in larger batches, which will improve scheduler startup time. Closes #2205 from saguziel/aguziel-reset-state Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e05d3b4d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e05d3b4d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e05d3b4d Branch: refs/heads/master Commit: e05d3b4df0013f0cff804dfbd1db0197f320de48 Parents: 28aeed4 Author: Alex Guziel <[email protected]> Authored: Fri Jul 14 16:37:13 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Fri Jul 14 16:37:13 2017 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 91 +++++++++++++++------- tests/jobs.py | 214 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 277 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e05d3b4d/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index f8ab1fa..e8431b7 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -35,7 +35,8 @@ import time from time import sleep import psutil -from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_, not_ +from sqlalchemy import ( + Column, Integer, String, DateTime, func, Index, or_, and_, not_) from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient from tabulate import tabulate @@ -212,27 +213,73 @@ class BaseJob(Base, LoggingMixin): raise NotImplementedError("This method needs to be overridden") @provide_session - def reset_state_for_orphaned_tasks(self, dag_run, session=None): + def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None): """ - This function checks for a DagRun if there are any tasks + This function checks if there are any tasks in the dagrun (or all) that have a scheduled state but are not known by the executor. If it finds those it will reset the state to None so they will get picked up again. + The batch option is for performance reasons as the queries are made in + sequence. + + :param filter_by_dag_run: the dag_run we want to process, None if all + :type filter_by_dag_run: models.DagRun + :return: the TIs reset (in expired SQLAlchemy state) + :rtype: List(TaskInsance) """ queued_tis = self.executor.queued_tasks - # also consider running as the state might not have changed in the db yet - running = self.executor.running - tis = list() - tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session)) - tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session)) - - for ti in tis: - if ti.key not in queued_tis and ti.key not in running: - self.logger.debug("Rescheduling orphaned task {}".format(ti)) - ti.state = State.NONE + running_tis = self.executor.running + + resettable_states = [State.SCHEDULED, State.QUEUED] + TI = models.TaskInstance + DR = models.DagRun + if filter_by_dag_run is None: + resettable_tis = ( + session + .query(TI) + .join( + DR, + and_( + TI.dag_id == DR.dag_id, + TI.execution_date == DR.execution_date)) + .filter( + DR.state == State.RUNNING, + DR.external_trigger.is_(False), + DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'), + TI.state.in_(resettable_states))).all() + else: + resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states, + session=session) + tis_to_reset = [] + # Can't use an update here since it doesn't support joins + for ti in resettable_tis: + if ti.key not in queued_tis and ti.key not in running_tis: + tis_to_reset.append(ti) + + filter_for_tis = ([and_(TI.dag_id == ti.dag_id, + TI.task_id == ti.task_id, + TI.execution_date == ti.execution_date) + for ti in tis_to_reset]) + if len(tis_to_reset) == 0: + return [] + reset_tis = ( + session + .query(TI) + .filter(or_(*filter_for_tis), TI.state.in_(resettable_states)) + .with_for_update() + .all()) + for ti in reset_tis: + ti.state = State.NONE + session.merge(ti) + task_instance_str = '\n\t'.join( + ["{}".format(x) for x in reset_tis]) session.commit() + self.logger.info("Reset the following {} TaskInstances:\n\t{}" + .format(len(reset_tis), task_instance_str)) + return reset_tis + class DagFileProcessor(AbstractDagFileProcessor): """Helps call SchedulerJob.process_file() in a separate process.""" @@ -1354,19 +1401,8 @@ class SchedulerJob(BaseJob): self.executor.start() session = settings.Session() - self.logger.info("Resetting state for orphaned tasks") - # grab orphaned tasks and make sure to reset their state - active_runs = DagRun.find( - state=State.RUNNING, - external_trigger=False, - session=session, - no_backfills=True, - ) - for dr in active_runs: - self.logger.info("Resetting {} {}".format(dr.dag_id, - dr.execution_date)) - self.reset_state_for_orphaned_tasks(dr, session=session) - + self.logger.info("Resetting orphaned tasks for active dag runs") + self.reset_state_for_orphaned_tasks(session=session) session.close() execute_start_time = datetime.now() @@ -1828,8 +1864,7 @@ class BackfillJob(BaseJob): run.run_id = run_id run.verify_integrity(session=session) - # check if we have orphaned tasks - self.reset_state_for_orphaned_tasks(dag_run=run, session=session) + self.reset_state_for_orphaned_tasks(filter_by_dag_run=run, session=session) # for some reason if we dont refresh the reference to run is lost run.refresh_from_db() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e05d3b4d/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 6e6150b..13bd9f5 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -2028,3 +2028,217 @@ class SchedulerJobTest(unittest.TestCase): for file_path in list_py_file_paths(TEST_DAGS_FOLDER): detected_files.append(file_path) self.assertEqual(sorted(detected_files), sorted(expected_files)) + + def test_reset_orphaned_tasks_nothing(self): + """Try with nothing. """ + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + self.assertEqual(0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + + def test_reset_orphaned_tasks_external_triggered_dag(self): + dag_id = 'test_reset_orphaned_tasks_external_triggered_dag' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') + task_id = dag_id + '_task' + task = DummyOperator(task_id=task_id, dag=dag) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag, session=session) + ti = dr1.get_task_instances(session=session)[0] + dr1.state = State.RUNNING + ti.state = State.SCHEDULED + dr1.external_trigger = True + session.merge(ti) + session.merge(dr1) + session.commit() + + self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + + def test_reset_orphaned_tasks_backfill_dag(self): + dag_id = 'test_reset_orphaned_tasks_backfill_dag' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') + task_id = dag_id + '_task' + task = DummyOperator(task_id=task_id, dag=dag) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag, session=session) + ti = dr1.get_task_instances(session=session)[0] + ti.state = State.SCHEDULED + dr1.state = State.RUNNING + dr1.run_id = BackfillJob.ID_PREFIX + '_sdfsfdfsd' + session.merge(ti) + session.merge(dr1) + session.commit() + + self.assertTrue(dr1.is_backfill) + self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + + def test_reset_orphaned_tasks_specified_dagrun(self): + """Try to reset when we specify a dagrun and ensure nothing else is.""" + dag_id = 'test_reset_orphaned_tasks_specified_dagrun' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') + task_id = dag_id + '_task' + task = DummyOperator(task_id=task_id, dag=dag) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + # make two dagruns, only reset for one + dr1 = scheduler.create_dag_run(dag) + dr2 = scheduler.create_dag_run(dag) + dr1.state = State.SUCCESS + dr2.state = State.RUNNING + ti1 = dr1.get_task_instances(session=session)[0] + ti2 = dr2.get_task_instances(session=session)[0] + ti1.state = State.SCHEDULED + ti2.state = State.SCHEDULED + + session.merge(ti1) + session.merge(ti2) + session.merge(dr1) + session.merge(dr2) + session.commit() + + reset_tis = scheduler.reset_state_for_orphaned_tasks(filter_by_dag_run=dr2, session=session) + self.assertEquals(1, len(reset_tis)) + ti1.refresh_from_db(session=session) + ti2.refresh_from_db(session=session) + self.assertEquals(State.SCHEDULED, ti1.state) + self.assertEquals(State.NONE, ti2.state) + + def test_reset_orphaned_tasks_nonexistent_dagrun(self): + """Make sure a task in an orphaned state is not reset if it has no dagrun. """ + dag_id = 'test_reset_orphaned_tasks_nonexistent_dagrun' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') + task_id = dag_id + '_task' + task = DummyOperator(task_id=task_id, dag=dag) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE) + session.add(ti) + session.commit() + + ti.refresh_from_db() + ti.state = State.SCHEDULED + session.merge(ti) + session.commit() + + self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + + def test_reset_orphaned_tasks_no_orphans(self): + dag_id = 'test_reset_orphaned_tasks_no_orphans' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') + task_id = dag_id + '_task' + task = DummyOperator(task_id=task_id, dag=dag) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag) + dr1.state = State.RUNNING + tis = dr1.get_task_instances(session=session) + tis[0].state = State.RUNNING + session.merge(dr1) + session.merge(tis[0]) + session.commit() + + self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + tis[0].refresh_from_db() + self.assertEquals(State.RUNNING, tis[0].state) + + def test_reset_orphaned_tasks_non_running_dagruns(self): + """Ensure orphaned tasks with non-running dagruns are not reset.""" + dag_id = 'test_reset_orphaned_tasks_non_running_dagruns' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') + task_id = dag_id + '_task' + task = DummyOperator(task_id=task_id, dag=dag) + + scheduler = SchedulerJob(**self.default_scheduler_args) + session = settings.Session() + + dr1 = scheduler.create_dag_run(dag) + dr1.state = State.SUCCESS + tis = dr1.get_task_instances(session=session) + self.assertEquals(1, len(tis)) + tis[0].state = State.SCHEDULED + session.merge(dr1) + session.merge(tis[0]) + session.commit() + + self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + + def test_reset_orphaned_tasks_with_orphans(self): + """Create dagruns and esnure only ones with correct states are reset.""" + prefix = 'scheduler_job_test_test_reset_orphaned_tasks' + states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, State.SUCCESS] + states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE] + + dag = DAG(dag_id=prefix, + start_date=DEFAULT_DATE, + schedule_interval="@daily") + tasks = [] + for i in range(len(states)): + task_id = "{}_task_{}".format(prefix, i) + task = DummyOperator(task_id=task_id, dag=dag) + tasks.append(task) + + scheduler = SchedulerJob(**self.default_scheduler_args) + + session = settings.Session() + + # create dagruns + dr1 = scheduler.create_dag_run(dag) + dr2 = scheduler.create_dag_run(dag) + dr1.state = State.RUNNING + dr2.state = State.SUCCESS + session.merge(dr1) + session.merge(dr2) + session.commit() + + # create taskinstances and set states + dr1_tis = [] + dr2_tis = [] + for i, (task, state) in enumerate(zip(tasks, states)): + ti1 = TI(task, dr1.execution_date) + ti2 = TI(task, dr2.execution_date) + ti1.refresh_from_db() + ti2.refresh_from_db() + ti1.state = state + ti2.state = state + dr1_tis.append(ti1) + dr2_tis.append(ti2) + session.merge(ti1) + session.merge(ti2) + session.commit() + + self.assertEqual(2, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + + for ti in dr1_tis + dr2_tis: + ti.refresh_from_db() + + # running dagrun should be reset + for state, ti in zip(states, dr1_tis): + if state in states_to_reset: + self.assertIsNone(ti.state) + else: + self.assertEqual(state, ti.state) + + # otherwise not + for state, ti in zip(states, dr2_tis): + self.assertEqual(state, ti.state) + + for state, ti in zip(states, dr1_tis): + ti.state = state + session.commit() + + scheduler.reset_state_for_orphaned_tasks(filter_by_dag_run=dr1, session=session) + + # check same for dag_run version + for state, ti in zip(states, dr2_tis): + self.assertEqual(state, ti.state) + + session.close()
