This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d9a805253dc Make `conf.validate` pluggable on config parser (#58188)
d9a805253dc is described below
commit d9a805253dcc9b705b538022be70be772df99c8d
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Nov 12 16:38:14 2025 +0530
Make `conf.validate` pluggable on config parser (#58188)
---
airflow-core/src/airflow/configuration.py | 30 +++++++---
airflow-core/tests/unit/core/test_configuration.py | 66 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 7 deletions(-)
diff --git a/airflow-core/src/airflow/configuration.py
b/airflow-core/src/airflow/configuration.py
index fc6f1179054..a0d03aea8df 100644
--- a/airflow-core/src/airflow/configuration.py
+++ b/airflow-core/src/airflow/configuration.py
@@ -32,7 +32,7 @@ import subprocess
import sys
import warnings
from base64 import b64encode
-from collections.abc import Generator, Iterable
+from collections.abc import Callable, Generator, Iterable
from configparser import ConfigParser, NoOptionError, NoSectionError
from contextlib import contextmanager
from copy import deepcopy
@@ -758,10 +758,29 @@ class AirflowConfigParser(ConfigParser):
self._default_values =
create_default_config_parser(self.configuration_description)
self._providers_configuration_loaded = False
- def validate(self):
- self._validate_sqlite3_version()
- self._validate_enums()
+ @property
+ def _validators(self) -> list[Callable[[], None]]:
+ """
+ Return list of validators defined on a config parser class.
+
+ Subclasses can override this to customize the validators that are run
during validation on the
+ config parser instance.
+ """
+ return [
+ self._validate_sqlite3_version,
+ self._validate_enums,
+ self._validate_deprecated_values,
+ self._upgrade_postgres_metastore_conn,
+ ]
+ def validate(self) -> None:
+ """Run all registered validators."""
+ for validator in self._validators:
+ validator()
+ self.is_validated = True
+
+ def _validate_deprecated_values(self) -> None:
+ """Validate and upgrade deprecated default values."""
for section, replacement in self.deprecated_values.items():
for name, info in replacement.items():
old, new = info
@@ -777,9 +796,6 @@ class AirflowConfigParser(ConfigParser):
new_value=new_value,
)
- self._upgrade_postgres_metastore_conn()
- self.is_validated = True
-
def _upgrade_postgres_metastore_conn(self):
"""
Upgrade SQL schemas.
diff --git a/airflow-core/tests/unit/core/test_configuration.py
b/airflow-core/tests/unit/core/test_configuration.py
index 8aa1d3aa0a4..ae282e618eb 100644
--- a/airflow-core/tests/unit/core/test_configuration.py
+++ b/airflow-core/tests/unit/core/test_configuration.py
@@ -1105,6 +1105,72 @@ key7 =
for key, value in expected_backend_kwargs.items():
assert getattr(secrets_backend, key) == value
+ def test_validate_sets_is_validated_flag(self):
+ """Test that validate() sets is_validated to True."""
+ test_conf = AirflowConfigParser(default_config="")
+ assert test_conf.is_validated is False
+ test_conf.validate()
+ assert test_conf.is_validated is True
+
+ def test_validators_can_be_overridden_in_subclass(self):
+ """Test that subclasses can override _validators to customize
validation."""
+
+ class CustomConfigParser(AirflowConfigParser):
+ @property
+ def _validators(self):
+ return [self._validate_enums]
+
+ test_conf = CustomConfigParser(default_config="")
+ validators = test_conf._validators
+ assert len(validators) == 1
+ assert validators[0].__name__ == "_validate_enums"
+
+ def test_validate_exception_is_handled(self):
+ """Test that exceptions from validators bubble up and is_validated
remains False."""
+
+ class FailingValidatorsParser(AirflowConfigParser):
+ def failing_validator(self):
+ raise AirflowConfigException("Test validator failure")
+
+ @property
+ def _validators(self):
+ return [self.failing_validator]
+
+ failing_conf = FailingValidatorsParser(default_config="")
+ assert failing_conf.is_validated is False
+
+ with pytest.raises(AirflowConfigException, match="Test validator
failure"):
+ failing_conf.validate()
+ assert failing_conf.is_validated is False
+
+ def test_validate_is_idempotent(self):
+ """Test that running validate() multiple times is idempotent."""
+ test_conf = AirflowConfigParser(default_config="")
+
+ test_conf.deprecated_values = {
+ "core": {"executor": (re.compile(re.escape("SequentialExecutor")),
"LocalExecutor")},
+ }
+ test_conf.read_dict({"core": {"executor": "SequentialExecutor"}})
+
+ # first pass at validation
+ test_conf.validate()
+ assert test_conf.is_validated is True
+ first_upgraded_values = dict(test_conf.upgraded_values)
+
+ # deprecated value should be upgraded
+ assert ("core", "executor") in first_upgraded_values
+ assert first_upgraded_values[("core", "executor")] ==
"SequentialExecutor"
+
+ # second pass at validation
+ test_conf.validate()
+ assert test_conf.is_validated is True
+
+ # previously upgraded values should remain the same, no re-upgrade
+ second_upgraded_values = dict(test_conf.upgraded_values)
+ assert first_upgraded_values == second_upgraded_values
+
+ assert test_conf.get("core", "executor") == "LocalExecutor"
+
@mock.patch.dict(
"os.environ",