This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 15d42b4320 Reduce default for max TIs per query, enforce <= 
parallelism (#32572)
15d42b4320 is described below

commit 15d42b4320d535cf54743929f134e36f59c615bb
Author: RaphaĆ«l Vandon <[email protected]>
AuthorDate: Thu Jul 20 13:29:20 2023 -0700

    Reduce default for max TIs per query, enforce <= parallelism (#32572)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    Co-authored-by: Simon Liu <[email protected]>
    Co-authored-by: lyso <[email protected]>
    Co-authored-by: root <root@GamePC>
---
 airflow/config_templates/config.yml                |  5 ++--
 airflow/configuration.py                           | 20 ++++++++++++++++
 .../administration-and-deployment/scheduler.rst    |  7 +++---
 newsfragments/32572.significant.rst                | 10 ++++++++
 tests/core/test_configuration.py                   | 27 ++++++++++++++++++++++
 5 files changed, 64 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 0629cb5d11..fbaac698b0 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2463,14 +2463,15 @@ scheduler:
     max_tis_per_query:
       description: |
         This changes the batch size of queries in the scheduling main loop.
+        This should not be greater than ``core.parallelism``.
         If this is too high, SQL query performance may be impacted by
         complexity of query predicate, and/or excessive locking.
         Additionally, you may hit the maximum allowable query length for your 
db.
-        Set this to 0 for no limit (not advised)
+        Set this to 0 to use the value of ``core.parallelism``
       version_added: ~
       type: integer
       example: ~
-      default: "512"
+      default: "16"
     use_row_level_locking:
       description: |
         Should the scheduler issue ``SELECT ... FOR UPDATE`` in relevant 
queries.
diff --git a/airflow/configuration.py b/airflow/configuration.py
index c8e19651d5..10f4cf5a70 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -594,6 +594,7 @@ class AirflowConfigParser(ConfigParser):
     def validate(self):
         self._validate_sqlite3_version()
         self._validate_enums()
+        self._validate_max_tis_per_query()
 
         for section, replacement in self.deprecated_values.items():
             for name, info in replacement.items():
@@ -615,6 +616,25 @@ class AirflowConfigParser(ConfigParser):
         self._upgrade_postgres_metastore_conn()
         self.is_validated = True
 
+    def _validate_max_tis_per_query(self) -> None:
+        """
+        Check if config ``scheduler.max_tis_per_query`` is not greater than 
``core.parallelism``.
+        If not met, a warning message is printed to guide the user to correct 
it.
+
+        More info: https://github.com/apache/airflow/pull/32572
+        """
+        max_tis_per_query = self.getint("scheduler", "max_tis_per_query")
+        parallelism = self.getint("core", "parallelism")
+
+        if max_tis_per_query > parallelism:
+            warnings.warn(
+                f"Config scheduler.max_tis_per_query (value: 
{max_tis_per_query}) "
+                f"should NOT be greater than core.parallelism (value: 
{parallelism}). "
+                "Will now use core.parallelism as the max task instances per 
query "
+                "instead of specified value.",
+                UserWarning,
+            )
+
     def _upgrade_auth_backends(self):
         """
         Ensure a custom auth_backends setting contains session.
diff --git a/docs/apache-airflow/administration-and-deployment/scheduler.rst 
b/docs/apache-airflow/administration-and-deployment/scheduler.rst
index 1a2a41b136..06769d1595 100644
--- a/docs/apache-airflow/administration-and-deployment/scheduler.rst
+++ b/docs/apache-airflow/administration-and-deployment/scheduler.rst
@@ -368,11 +368,12 @@ However, you can also look at other 
non-performance-related scheduler configurat
   The scheduler will list and sort the DAG files to decide the parsing order.
 
 - :ref:`config:scheduler__max_tis_per_query`
-  The batch size of queries in the scheduling main loop. If this is too high, 
SQL query
-  performance may be impacted by complexity of query predicate, and/or 
excessive locking.
+  The batch size of queries in the scheduling main loop. This should not be 
greater than
+  ``core.parallelism``. If this is too high then SQL query performance may be 
impacted by
+  complexity of query predicate, and/or excessive locking.
 
   Additionally, you may hit the maximum allowable query length for your db.
-  Set this to 0 for no limit (not advised).
+  Set this to 0 to use the value of ``core.parallelism``.
 
 - :ref:`config:scheduler__min_file_process_interval`
   Number of seconds after which a DAG file is re-parsed. The DAG file is 
parsed every
diff --git a/newsfragments/32572.significant.rst 
b/newsfragments/32572.significant.rst
new file mode 100644
index 0000000000..3105e98315
--- /dev/null
+++ b/newsfragments/32572.significant.rst
@@ -0,0 +1,10 @@
+The default value for ``scheduler.max_tis_per_query`` is changed from 512 to 
16.
+
+This change is expected to make the Scheduler more responsive.
+
+``scheduler.max_tis_per_query`` needs to be lower than ``core.parallelism``.
+If both were left to their default value previously, the effective default 
value of ``scheduler.max_tis_per_query`` was 32
+(because it was capped at ``core.parallelism``).
+
+To keep the behavior as close as possible to the old config, one can set 
``scheduler.max_tis_per_query = 0``,
+in which case it'll always use the value of ``core.parallelism``.
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index aa7820cf37..ecb01f5bee 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -78,6 +78,9 @@ def parameterized_config(template) -> str:
         "AIRFLOW__TESTSECTION__TESTPERCENT": "with%percent",
         "AIRFLOW__TESTCMDENV__ITSACOMMAND_CMD": 'echo -n "OK"',
         "AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD": 'echo -n "NOT OK"',
+        # also set minimum conf values required to pass validation
+        "AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY": "16",
+        "AIRFLOW__CORE__PARALLELISM": "32",
     },
 )
 class TestConf:
@@ -755,6 +758,22 @@ notacommand = OK
         )
         assert message == exception
 
+    @mock.patch.dict(
+        "os.environ",
+        {
+            "AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY": "200",
+            "AIRFLOW__CORE__PARALLELISM": "100",
+        },
+    )
+    def test_max_tis_per_query_too_high(self):
+        test_conf = AirflowConfigParser()
+
+        with pytest.warns(UserWarning) as ctx:
+            test_conf._validate_max_tis_per_query()
+
+        captured_warnings_msg = str(ctx.pop().message)
+        assert "max_tis_per_query" in captured_warnings_msg and 
"core.parallelism" in captured_warnings_msg
+
     def test_as_dict_works_without_sensitive_cmds(self):
         conf_materialize_cmds = conf.as_dict(display_sensitive=True, raw=True, 
include_cmds=True)
         conf_maintain_cmds = conf.as_dict(display_sensitive=True, raw=True, 
include_cmds=False)
@@ -879,6 +898,14 @@ key7 =
         assert test_conf.gettimedelta("default", "key7") is None
 
 
[email protected](
+    "os.environ",
+    {
+        # set minimum conf values required to pass validation
+        "AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY": "16",
+        "AIRFLOW__CORE__PARALLELISM": "32",
+    },
+)
 class TestDeprecatedConf:
     @conf_vars(
         {

Reply via email to