Repository: incubator-airflow Updated Branches: refs/heads/master f1f022c1e -> 3547cbffd
[AIRFLOW-1438] Change batch size per query in scheduler This should help if query size is limited. It also reduces how long locks are held. Closes #2462 from saguziel/aguziel-paginate-query Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3547cbff Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3547cbff Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3547cbff Branch: refs/heads/master Commit: 3547cbffdbffac2f98a8aa05526e8c9671221025 Parents: f1f022c Author: Alex Guziel <[email protected]> Authored: Fri Jul 21 13:59:14 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Fri Jul 21 13:59:14 2017 -0700 ---------------------------------------------------------------------- airflow/config_templates/default_airflow.cfg | 5 +++ airflow/config_templates/default_test.cfg | 1 + airflow/jobs.py | 45 ++++++++++++++++++----- tests/jobs.py | 38 +++++++++++++++++++ 4 files changed, 80 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ddd1ba8..33cee39 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -325,6 +325,11 @@ scheduler_zombie_task_threshold = 300 # DAG definition (catchup) catchup_by_default = True +# This changes the batch size of queries in the scheduling main loop. +# This depends on query length limits and how long you are willing to hold locks. +# 0 for no limit +max_tis_per_query = 0 + # Statsd (https://github.com/etsy/statsd) integration settings statsd_on = False statsd_host = localhost http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/config_templates/default_test.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index f6650af..88b19a5 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -85,6 +85,7 @@ max_threads = 2 catchup_by_default = True scheduler_zombie_task_threshold = 300 dag_dir_list_interval = 0 +max_tis_per_query = 0 [admin] hide_sensitive_variable_fields = True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 6b63df0..e2f8c94 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -595,6 +595,7 @@ class SchedulerJob(BaseJob): # Directory where log files for the processes that scheduled the DAGs reside self.child_process_log_directory = conf.get('scheduler', 'child_process_log_directory') + self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query') if run_duration is None: self.run_duration = conf.getint('scheduler', 'run_duration') @@ -1146,6 +1147,15 @@ class SchedulerJob(BaseJob): ["{}".format(x) for x in executable_tis]) self.logger.info("Setting the follow tasks to queued state:\n\t{}" .format(task_instance_str)) + # so these dont expire on commit + for ti in executable_tis: + copy_dag_id = ti.dag_id + copy_execution_date = ti.execution_date + copy_task_id = ti.task_id + make_transient(ti) + ti.dag_id = copy_dag_id + ti.execution_date = copy_execution_date + ti.task_id = copy_task_id return executable_tis @provide_session @@ -1289,15 +1299,32 @@ class SchedulerJob(BaseJob): """ executable_tis = self._find_executable_task_instances(simple_dag_bag, states, session=session) - tis_with_state_changed = self._change_state_for_executable_task_instances( - executable_tis, - states, - session=session) - self._enqueue_task_instances_with_queued_state( - simple_dag_bag, - tis_with_state_changed) - session.commit() - return len(tis_with_state_changed) + if self.max_tis_per_query == 0: + tis_with_state_changed = self._change_state_for_executable_task_instances( + executable_tis, + states, + session=session) + self._enqueue_task_instances_with_queued_state( + simple_dag_bag, + tis_with_state_changed) + session.commit() + return len(tis_with_state_changed) + else: + # makes chunks of max_tis_per_query size + chunks = ([executable_tis[i:i + self.max_tis_per_query] + for i in range(0, len(executable_tis), self.max_tis_per_query)]) + total_tis_queued = 0 + for chunk in chunks: + tis_with_state_changed = self._change_state_for_executable_task_instances( + chunk, + states, + session=session) + self._enqueue_task_instances_with_queued_state( + simple_dag_bag, + tis_with_state_changed) + session.commit() + total_tis_queued += len(tis_with_state_changed) + return total_tis_queued def _process_dags(self, dagbag, dags, tis_out): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3547cbff/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index e987e0c..c9ab742 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -1037,6 +1037,44 @@ class SchedulerJobTest(unittest.TestCase): six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state]) self.assertEqual(1, res) + def test_execute_task_instances_limit(self): + dag_id = 'SchedulerJobTest.test_execute_task_instances_limit' + task_id_1 = 'dummy_task' + task_id_2 = 'dummy_task_2' + # important that len(tasks) is less than concurrency + # because before scheduler._execute_task_instances would only + # check the num tasks once so if concurrency was 3, + # we could execute arbitrarily many tasks in the second run + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) + task1 = DummyOperator(dag=dag, task_id=task_id_1) + task2 = DummyOperator(dag=dag, task_id=task_id_2) + dagbag = SimpleDagBag([dag]) + + scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler.max_tis_per_query = 3 + session = settings.Session() + + tis = [] + for i in range(0, 4): + dr = scheduler.create_dag_run(dag) + ti1 = TI(task1, dr.execution_date) + ti2 = TI(task2, dr.execution_date) + tis.append(ti1) + tis.append(ti2) + ti1.refresh_from_db() + ti2.refresh_from_db() + ti1.state = State.SCHEDULED + ti2.state = State.SCHEDULED + session.merge(ti1) + session.merge(ti2) + session.commit() + res = scheduler._execute_task_instances(dagbag, [State.SCHEDULED]) + + self.assertEqual(8, res) + for ti in tis: + ti.refresh_from_db() + self.assertEqual(State.QUEUED, ti.state) + def test_change_state_for_tis_without_dagrun(self): dag = DAG( dag_id='test_change_state_for_tis_without_dagrun',
