amoghrajesh commented on code in PR #64209:
URL: https://github.com/apache/airflow/pull/64209#discussion_r2999839191
##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -327,7 +390,7 @@ def _get_option_from_provider_metadata_config_fallbacks(
) -> str | ValueNotFound:
"""Get config option from provider metadata fallback defaults."""
value =
self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
- if value is not None:
+ if value is not VALUE_NOT_FOUND_SENTINEL:
return value
return VALUE_NOT_FOUND_SENTINEL
Review Comment:
Just noticed this, shouldn't it be inverted?
##########
airflow-core/tests/unit/core/test_configuration.py:
##########
@@ -1876,50 +1881,191 @@ def test_sensitive_values():
@skip_if_force_lowest_dependencies_marker
-def test_restore_and_reload_provider_configuration():
+def test_provider_configuration_toggle_with_context_manager():
+ """Test that make_sure_configuration_loaded toggles provider config
on/off."""
from airflow.settings import conf
- assert conf.providers_configuration_loaded is True
+ assert conf._use_providers_configuration is True
+ # With providers enabled, the provider value is returned via the fallback
lookup chain.
assert conf.get("celery", "celery_app_name") ==
"airflow.providers.celery.executors.celery_executor"
- conf.restore_core_default_configuration()
- assert conf.providers_configuration_loaded is False
- # built-in pre-2-7 celery executor
- assert conf.get("celery", "celery_app_name") ==
"airflow.executors.celery_executor"
- conf.load_providers_configuration()
- assert conf.providers_configuration_loaded is True
+
+ with conf.make_sure_configuration_loaded(with_providers=False):
+ assert conf._use_providers_configuration is False
+ with pytest.raises(
+ AirflowConfigException,
+ match=re.escape("section/key [celery/celery_app_name] not found in
config"),
+ ):
+ conf.get("celery", "celery_app_name")
+ # After the context manager exits, provider config is restored.
+ assert conf._use_providers_configuration is True
assert conf.get("celery", "celery_app_name") ==
"airflow.providers.celery.executors.celery_executor"
@skip_if_force_lowest_dependencies_marker
-def test_error_when_contributing_to_existing_section():
+def test_provider_sections_do_not_overlap_with_core():
+ """Test that provider config sections don't overlap with core
configuration sections."""
from airflow.settings import conf
- with conf.make_sure_configuration_loaded(with_providers=True):
- assert conf.providers_configuration_loaded is True
- assert conf.get("celery", "celery_app_name") ==
"airflow.providers.celery.executors.celery_executor"
- conf.restore_core_default_configuration()
- assert conf.providers_configuration_loaded is False
- conf.configuration_description["celery"] = {
- "description": "Celery Executor configuration",
- "options": {
- "celery_app_name": {
- "default": "test",
- }
- },
- }
- conf._default_values.add_section("celery")
- conf._default_values.set("celery", "celery_app_name", "test")
- assert conf.get("celery", "celery_app_name") == "test"
- # patching restoring_core_default_configuration to avoid reloading the
defaults
- with patch.object(conf, "restore_core_default_configuration"):
+ core_sections = set(conf._configuration_description.keys())
+ provider_sections =
set(conf._provider_metadata_configuration_description.keys())
+ overlap = core_sections & provider_sections
+ assert not overlap, (
+ f"Provider configuration sections overlap with core sections:
{overlap}. "
+ "Providers must only add new sections, not contribute to existing
ones."
+ )
+
+
+@skip_if_force_lowest_dependencies_marker
+class TestProviderConfigPriority:
+ """Tests that conf.get and conf.has_option respect provider metadata and
cfg fallbacks with correct priority.
+
+ Note: tests that use conf_vars must come AFTER tests that check
make_sure_configuration_loaded,
+ because conf_vars restores provider-sourced values into the config-file
layer, which then
+ persists even when providers are disabled.
Review Comment:
We don't guarantee class method execution order? This will be flaky on
parallel runs or if tests are reordered. Consider using `pytest.mark.order` if
it really matters
##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -1968,28 +2010,23 @@ def make_sure_configuration_loaded(self,
with_providers: bool) -> Generator[None
"""
Make sure configuration is loaded with or without providers.
- This happens regardless if the provider configuration has been loaded
before or not.
- Restores configuration to the state before entering the context.
+ The context manager will only toggle the
`self._use_providers_configuration` flag if `with_providers` is False, and will
reset `self._use_providers_configuration` to True after the context block.
+ Nop for `with_providers=True` as the configuration already loads
providers configuration by default.
:param with_providers: whether providers should be loaded
"""
- needs_reload = False
- if with_providers:
- self._ensure_providers_config_loaded()
- else:
- needs_reload = self._ensure_providers_config_unloaded()
- yield
- if needs_reload:
- self._reload_provider_configs()
-
- def _ensure_providers_config_loaded(self) -> None:
- """Ensure providers configurations are loaded."""
- raise NotImplementedError("Subclasses must implement
_ensure_providers_config_loaded method")
-
- def _ensure_providers_config_unloaded(self) -> bool:
- """Ensure providers configurations are unloaded temporarily to load
core configs. Returns True if providers get unloaded."""
- raise NotImplementedError("Subclasses must implement
_ensure_providers_config_unloaded method")
-
- def _reload_provider_configs(self) -> None:
- """Reload providers configuration."""
- raise NotImplementedError("Subclasses must implement
_reload_provider_configs method")
+ if not with_providers:
+ self._use_providers_configuration = False
+ # Only invalidate cached properties that depend on
_use_providers_configuration.
+ # Do NOT use invalidate_cache() here — it would also evict
expensive provider-discovery
+ # caches (_provider_metadata_configuration_description,
_provider_metadata_config_fallback_default_values)
+ # that don't depend on this flag.
+ self.__dict__.pop("configuration_description", None)
+ self.__dict__.pop("sensitive_config_values", None)
Review Comment:
We are manually popping from `__dict__` conditionally. Although it works,
it seems hacky, maybe add a helper like `_invalidate_provider_flag_caches()`
that centralises all the pops
##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -1185,56 +1266,6 @@ def _resolve_deprecated_lookup(
return section, key, deprecated_section, deprecated_key,
warning_emitted
- def load_providers_configuration(self) -> None:
- """
- Load configuration for providers.
-
- This should be done after initial configuration have been performed.
Initializing and discovering
- providers is an expensive operation and cannot be performed when we
load configuration for the first
- time when airflow starts, because we initialize configuration very
early, during importing of the
- `airflow` package and the module is not yet ready to be used when it
happens and until configuration
- and settings are loaded. Therefore, in order to reload provider
configuration we need to additionally
- load provider - specific configuration.
- """
- log.debug("Loading providers configuration")
-
- self.restore_core_default_configuration()
- for provider, config in
self._provider_manager_type().already_initialized_provider_configs:
- for provider_section, provider_section_content in config.items():
- provider_options = provider_section_content["options"]
- section_in_current_config =
self.configuration_description.get(provider_section)
- if not section_in_current_config:
- self.configuration_description[provider_section] =
deepcopy(provider_section_content)
- section_in_current_config =
self.configuration_description.get(provider_section)
- section_in_current_config["source"] = f"default-{provider}"
- for option in provider_options:
- section_in_current_config["options"][option]["source"]
= f"default-{provider}"
- else:
- section_source = section_in_current_config.get("source",
"Airflow's core package").split(
- "default-"
- )[-1]
- raise AirflowConfigException(
- f"The provider {provider} is attempting to contribute "
- f"configuration section {provider_section} that "
- f"has already been added before. The source of it:
{section_source}. "
- "This is forbidden. A provider can only add new
sections. It "
- "cannot contribute options to existing sections or
override other "
- "provider's configuration.",
- UserWarning,
- )
- self._default_values =
self._create_default_config_parser_callable(self.configuration_description)
- # Cached properties derived from configuration_description (e.g.
sensitive_config_values) need
- # to be recomputed now that provider config has been merged in.
- self.invalidate_cache()
- self._providers_configuration_loaded = True
-
- def restore_core_default_configuration(self) -> None:
- """Restore the parser state before provider-contributed sections were
loaded."""
- self.configuration_description =
deepcopy(self._base_configuration_description)
- self._default_values =
self._create_default_config_parser_callable(self.configuration_description)
- self.invalidate_cache()
- self._providers_configuration_loaded = False
Review Comment:
It shouldn't be the case but these are public API. Anything / Anyone calling
this will break, suggest going through deprecation cycle instead.
##########
shared/configuration/src/airflow_shared/configuration/parser.py:
##########
@@ -279,24 +279,77 @@ def _lookup_sequence(self) -> list[Callable]:
self._get_option_from_provider_metadata_config_fallbacks,
]
- def _get_config_sources_for_as_dict(self) -> list[tuple[str,
ConfigParser]]:
+ @functools.cached_property
+ def configuration_description(self) -> dict[str, dict[str, Any]]:
+ """
+ Return configuration description from multiple sources.
+
+ Respects the ``_use_providers_configuration`` flag to decide whether
to include
+ provider configuration.
+
+ The priority order is as follows (later sources override earlier ones):
+
+ 1. The base configuration description provided in ``__init__``,
usually loaded
+ from ``config.yml`` in core.
+ 2. ``_provider_cfg_config_fallback_default_values``, loaded from
+ ``provider_config_fallback_defaults.cfg``.
+ 3. ``_provider_metadata_config_fallback_default_values``, loaded from
provider
+ packages' ``get_provider_info`` method (via ProvidersManager /
+ RuntimeProvidersManager's ``.provider_configs`` property).
+
+ We use ``cached_property`` to cache the merged result; clear this
cache (via
+ ``invalidate_cache``) when toggling ``_use_providers_configuration``.
+ """
+ if not self._use_providers_configuration:
+ return self._configuration_description
+
+ merged_description: dict[str, dict[str, Any]] =
deepcopy(self._configuration_description)
+
+ # Merge full provider config descriptions (with metadata like
sensitive, description, etc.)
+ # from provider packages' get_provider_info method, reusing the cached
raw dict.
+ for section, section_content in
self._provider_metadata_configuration_description.items():
+ if section not in merged_description:
+ merged_description[section] = deepcopy(section_content)
+ else:
+ existing_options =
merged_description[section].setdefault("options", {})
+ for option, option_content in section_content.get("options",
{}).items():
+ if option not in existing_options:
+ existing_options[option] = deepcopy(option_content)
+
+ # Merge default values from cfg-based fallbacks (key=value only, no
metadata).
+ # Uses setdefault so provider metadata values above take priority.
+ cfg = self._provider_cfg_config_fallback_default_values
+ for section in cfg.sections():
+ section_options = merged_description.setdefault(section,
{"options": {}}).setdefault(
+ "options", {}
+ )
+ for option in cfg.options(section):
+ opt_dict = section_options.setdefault(option, {})
+ opt_dict.setdefault("default", cfg.get(section, option))
+ # For cfg-only options with no provider metadata, infer
sensitivity from name.
+ if "sensitive" not in opt_dict and
option.endswith(("password", "secret")):
+ opt_dict["sensitive"] = True
Review Comment:
But secrets masker has to know if its sensitive? Some field names might not
be deemed sensitive from name, like `remote_task_handler_kwargs`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]