This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6383f75c21269866c373e3cdfb7323de7ee224b0 Author: Kaxil Naik <[email protected]> AuthorDate: Wed Nov 25 09:33:19 2020 +0000 Rename `[scheduler] max_threads` to `[scheduler] parsing_processes` (#12605) From Airflow 1.10.14, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`. This is to align the name with the actual code where the Scheduler launches the number of processes defined by `[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG, serialize them and store them in the DB. (cherry picked from commit 486134426bf2cd54fae1f75d9bd50715b8369ca1) --- UPDATING.md | 7 +++++++ airflow/config_templates/config.yml | 6 +++--- airflow/config_templates/default_airflow.cfg | 6 +++--- airflow/config_templates/default_test.cfg | 2 +- airflow/configuration.py | 3 +++ airflow/jobs/scheduler_job.py | 2 +- airflow/utils/dag_processing.py | 4 ++-- docs/faq.rst | 7 +++++-- scripts/in_container/airflow_ci.cfg | 2 +- tests/utils/test_dag_processing.py | 2 +- 10 files changed, 27 insertions(+), 14 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index 577b644..4ad226a 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -64,6 +64,13 @@ https://developers.google.com/style/inclusive-documentation --> ## Airflow 1.10.14 +### `[scheduler] max_threads` config has been renamed to `[scheduler] parsing_processes` + +From Airflow 1.10.14, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`. + +This is to align the name with the actual code where the Scheduler launches the number of processes defined by +`[scheduler] parsing_processes` to parse the DAG files. + ### Airflow CLI changes in line with 2.0 The Airflow CLI has been organized so that related commands are grouped together as subcommands, diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index e89df22..87ee928 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1439,10 +1439,10 @@ type: string example: ~ default: "" - - name: max_threads + - name: parsing_processes description: | - The scheduler can run multiple threads in parallel to schedule dags. - This defines how many threads will run. + The scheduler can run multiple processes in parallel to parse dags. + This defines how many processes will run. version_added: ~ type: string example: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ea64d8f..662fd00 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -695,9 +695,9 @@ statsd_prefix = airflow # start with the elements of the list (e.g: scheduler,executor,dagrun) statsd_allow_list = -# The scheduler can run multiple threads in parallel to schedule dags. -# This defines how many threads will run. -max_threads = 2 +# The scheduler can run multiple processes in parallel to parse dags. +# This defines how many processes will run. +parsing_processes = 2 authenticate = False # Turn off scheduler use of cron intervals by setting this to False. diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 3ac2225..30a82a4 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -113,7 +113,7 @@ job_heartbeat_sec = 1 scheduler_heartbeat_sec = 5 scheduler_health_check_threshold = 30 authenticate = true -max_threads = 2 +parsing_processes = 2 catchup_by_default = True scheduler_zombie_task_threshold = 300 dag_dir_list_interval = 0 diff --git a/airflow/configuration.py b/airflow/configuration.py index 290843f..16081a3 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -183,6 +183,9 @@ class AirflowConfigParser(ConfigParser): 'json_format': 'elasticsearch_json_format', 'json_fields': 'elasticsearch_json_fields' + }, + 'scheduler': { + 'parsing_processes': 'max_threads' } } diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 9149699..b72e2b1 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -391,7 +391,7 @@ class SchedulerJob(BaseJob): self.do_pickle = do_pickle super(SchedulerJob, self).__init__(*args, **kwargs) - self.max_threads = conf.getint('scheduler', 'max_threads') + self.max_threads = conf.getint('scheduler', 'parsing_processes') if log: self._log = log diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 4a4b240..881a8ce 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -769,10 +769,10 @@ class DagFileProcessorManager(LoggingMixin): self._dag_ids = dag_ids self._async_mode = async_mode - self._parallelism = conf.getint('scheduler', 'max_threads') + self._parallelism = conf.getint('scheduler', 'parsing_processes') if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1: self.log.warning( - "Because we cannot use more than 1 thread (max_threads = {}) " + "Because we cannot use more than 1 thread (parsing_processes = {}) " "when using sqlite. So we set parallelism to 1.".format(self._parallelism) ) self._parallelism = 1 diff --git a/docs/faq.rst b/docs/faq.rst index 80849e0..c041e0a 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -205,8 +205,11 @@ This means ``explicit_defaults_for_timestamp`` is disabled in your mysql server How to reduce airflow dag scheduling latency in production? ----------------------------------------------------------- -- ``max_threads``: Scheduler will spawn multiple threads in parallel to schedule dags. This is controlled by ``max_threads`` with default value of 2. User should increase this value to a larger value (e.g numbers of cpus where scheduler runs - 1) in production. -- ``scheduler_heartbeat_sec``: User should consider to increase ``scheduler_heartbeat_sec`` config to a higher value (e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat and updates the job's entry in database. +- ``parsing_processes``: Scheduler will spawn multiple threads in parallel to parse dags. + This is controlled by ``parsing_processes`` with default value of 2. + User should increase this value to a larger value (e.g numbers of cpus where scheduler runs + 1) in production. +- ``scheduler_heartbeat_sec``: User should consider to increase ``scheduler_heartbeat_sec`` config to a higher value (e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat + and updates the job's entry in database. Why next_ds or prev_ds might not contain expected values? --------------------------------------------------------- diff --git a/scripts/in_container/airflow_ci.cfg b/scripts/in_container/airflow_ci.cfg index b097752..4933af0 100644 --- a/scripts/in_container/airflow_ci.cfg +++ b/scripts/in_container/airflow_ci.cfg @@ -52,4 +52,4 @@ _test_only_string = this is a test job_heartbeat_sec = 1 scheduler_heartbeat_sec = 5 authenticate = true -max_threads = 2 +parsing_processes = 2 diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index 3726a8d..b2bdf35 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -267,7 +267,7 @@ class TestDagFileProcessorManager(unittest.TestCase): file processors until the next zombie detection logic is invoked. """ test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py') - with conf_vars({('scheduler', 'max_threads'): '1', + with conf_vars({('scheduler', 'parsing_processes'): '1', ('core', 'load_examples'): 'False'}): dagbag = DagBag(test_dag_path) with create_session() as session:
