This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 15d42b4320 Reduce default for max TIs per query, enforce <=
parallelism (#32572)
15d42b4320 is described below
commit 15d42b4320d535cf54743929f134e36f59c615bb
Author: Raphaƫl Vandon <[email protected]>
AuthorDate: Thu Jul 20 13:29:20 2023 -0700
Reduce default for max TIs per query, enforce <= parallelism (#32572)
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: Simon Liu <[email protected]>
Co-authored-by: lyso <[email protected]>
Co-authored-by: root <root@GamePC>
---
airflow/config_templates/config.yml | 5 ++--
airflow/configuration.py | 20 ++++++++++++++++
.../administration-and-deployment/scheduler.rst | 7 +++---
newsfragments/32572.significant.rst | 10 ++++++++
tests/core/test_configuration.py | 27 ++++++++++++++++++++++
5 files changed, 64 insertions(+), 5 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 0629cb5d11..fbaac698b0 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2463,14 +2463,15 @@ scheduler:
max_tis_per_query:
description: |
This changes the batch size of queries in the scheduling main loop.
+ This should not be greater than ``core.parallelism``.
If this is too high, SQL query performance may be impacted by
complexity of query predicate, and/or excessive locking.
Additionally, you may hit the maximum allowable query length for your
db.
- Set this to 0 for no limit (not advised)
+ Set this to 0 to use the value of ``core.parallelism``
version_added: ~
type: integer
example: ~
- default: "512"
+ default: "16"
use_row_level_locking:
description: |
Should the scheduler issue ``SELECT ... FOR UPDATE`` in relevant
queries.
diff --git a/airflow/configuration.py b/airflow/configuration.py
index c8e19651d5..10f4cf5a70 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -594,6 +594,7 @@ class AirflowConfigParser(ConfigParser):
def validate(self):
self._validate_sqlite3_version()
self._validate_enums()
+ self._validate_max_tis_per_query()
for section, replacement in self.deprecated_values.items():
for name, info in replacement.items():
@@ -615,6 +616,25 @@ class AirflowConfigParser(ConfigParser):
self._upgrade_postgres_metastore_conn()
self.is_validated = True
+ def _validate_max_tis_per_query(self) -> None:
+ """
+ Check if config ``scheduler.max_tis_per_query`` is not greater than
``core.parallelism``.
+ If not met, a warning message is printed to guide the user to correct
it.
+
+ More info: https://github.com/apache/airflow/pull/32572
+ """
+ max_tis_per_query = self.getint("scheduler", "max_tis_per_query")
+ parallelism = self.getint("core", "parallelism")
+
+ if max_tis_per_query > parallelism:
+ warnings.warn(
+ f"Config scheduler.max_tis_per_query (value:
{max_tis_per_query}) "
+ f"should NOT be greater than core.parallelism (value:
{parallelism}). "
+ "Will now use core.parallelism as the max task instances per
query "
+ "instead of specified value.",
+ UserWarning,
+ )
+
def _upgrade_auth_backends(self):
"""
Ensure a custom auth_backends setting contains session.
diff --git a/docs/apache-airflow/administration-and-deployment/scheduler.rst
b/docs/apache-airflow/administration-and-deployment/scheduler.rst
index 1a2a41b136..06769d1595 100644
--- a/docs/apache-airflow/administration-and-deployment/scheduler.rst
+++ b/docs/apache-airflow/administration-and-deployment/scheduler.rst
@@ -368,11 +368,12 @@ However, you can also look at other
non-performance-related scheduler configurat
The scheduler will list and sort the DAG files to decide the parsing order.
- :ref:`config:scheduler__max_tis_per_query`
- The batch size of queries in the scheduling main loop. If this is too high,
SQL query
- performance may be impacted by complexity of query predicate, and/or
excessive locking.
+ The batch size of queries in the scheduling main loop. This should not be
greater than
+ ``core.parallelism``. If this is too high then SQL query performance may be
impacted by
+ complexity of query predicate, and/or excessive locking.
Additionally, you may hit the maximum allowable query length for your db.
- Set this to 0 for no limit (not advised).
+ Set this to 0 to use the value of ``core.parallelism``.
- :ref:`config:scheduler__min_file_process_interval`
Number of seconds after which a DAG file is re-parsed. The DAG file is
parsed every
diff --git a/newsfragments/32572.significant.rst
b/newsfragments/32572.significant.rst
new file mode 100644
index 0000000000..3105e98315
--- /dev/null
+++ b/newsfragments/32572.significant.rst
@@ -0,0 +1,10 @@
+The default value for ``scheduler.max_tis_per_query`` is changed from 512 to
16.
+
+This change is expected to make the Scheduler more responsive.
+
+``scheduler.max_tis_per_query`` needs to be lower than ``core.parallelism``.
+If both were left to their default value previously, the effective default
value of ``scheduler.max_tis_per_query`` was 32
+(because it was capped at ``core.parallelism``).
+
+To keep the behavior as close as possible to the old config, one can set
``scheduler.max_tis_per_query = 0``,
+in which case it'll always use the value of ``core.parallelism``.
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index aa7820cf37..ecb01f5bee 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -78,6 +78,9 @@ def parameterized_config(template) -> str:
"AIRFLOW__TESTSECTION__TESTPERCENT": "with%percent",
"AIRFLOW__TESTCMDENV__ITSACOMMAND_CMD": 'echo -n "OK"',
"AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD": 'echo -n "NOT OK"',
+ # also set minimum conf values required to pass validation
+ "AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY": "16",
+ "AIRFLOW__CORE__PARALLELISM": "32",
},
)
class TestConf:
@@ -755,6 +758,22 @@ notacommand = OK
)
assert message == exception
+ @mock.patch.dict(
+ "os.environ",
+ {
+ "AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY": "200",
+ "AIRFLOW__CORE__PARALLELISM": "100",
+ },
+ )
+ def test_max_tis_per_query_too_high(self):
+ test_conf = AirflowConfigParser()
+
+ with pytest.warns(UserWarning) as ctx:
+ test_conf._validate_max_tis_per_query()
+
+ captured_warnings_msg = str(ctx.pop().message)
+ assert "max_tis_per_query" in captured_warnings_msg and
"core.parallelism" in captured_warnings_msg
+
def test_as_dict_works_without_sensitive_cmds(self):
conf_materialize_cmds = conf.as_dict(display_sensitive=True, raw=True,
include_cmds=True)
conf_maintain_cmds = conf.as_dict(display_sensitive=True, raw=True,
include_cmds=False)
@@ -879,6 +898,14 @@ key7 =
assert test_conf.gettimedelta("default", "key7") is None
[email protected](
+ "os.environ",
+ {
+ # set minimum conf values required to pass validation
+ "AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY": "16",
+ "AIRFLOW__CORE__PARALLELISM": "32",
+ },
+)
class TestDeprecatedConf:
@conf_vars(
{