This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f6dbffe81d Make TaskSDK conf respect default config from provider
metadata (#62696)
2f6dbffe81d is described below
commit 2f6dbffe81d659f0d2d2cf1950c63d3241a4119c
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Mon Mar 23 16:54:27 2026 +0800
Make TaskSDK conf respect default config from provider metadata (#62696)
* Add lazy initialization for provider configs in
ProvidersManagerTaskRuntime
* Add common load_providers_configuration in shared lib
* Add provider metadata config fallback lookup in core conf
Fix _provider_metadata_config_fallback_default_values
* Add provider lookup and related utils to shared
* Add missing provider_configs property in ProvidersManagerTaskRuntime
* Address all the comments from Kxail
- remove _get_config_sources_for_as_dict in child class
- clarify cached_property and constructing ProvidersManagerTaskRuntime
in SDK conf
- _get_option_from_provider_metadata_config_fallbacks nit
- ProvidersManager lazy import
- provider.data.get("config") nit
- remove print in prek check
* Fix unit tests
* Fix TestDeprecatedConf tests failing when run in isolation
The 6 tests in TestDeprecatedConf that use cmd/secret config sources
failed when running only TestDeprecatedConf (not the full file) because
sensitive_config_values -- a cached_property on the config parser -- was
computed during airflow initialization before the test module added its
deprecated_options entries. The stale cache meant _include_commands and
_include_secrets never processed ('core', 'sql_alchemy_conn').
Invalidate both sensitive_config_values and inversed_deprecated_options
caches right after modifying deprecated_options at module level.
Co-authored-by: Cursor Agent <[email protected]>
Co-authored-by: Jason(Zhe-You) Liu <[email protected]>
* Respect worker_mode of _get_custom_secret_backend for conf
Fix _get_custom_secret_backend
* Fetch all the conf in constructor of SchedulerJobRunner to avoid hitting
prohibit_commit block
* Fix test_execute_task_instances_unlimited_multiple_executors
- We need the conf_vars block at constcutor level as we will pre-fetch
all the conf at SchedulerJobRunner constructor
* Fix otel integration test
* Resolve partial of Amogh comments
- Add invalidate_cache method at shared Parser
- Unified initialize_providers_configuration for Core & SDK
provider_manager
- call conf.initialize_providers_configuration for both provider_manager
- Replace manual clear cache with invalidate_cache method in
test_configuration
* Resolve Kaxil's second review comments
* Fix test_write_default_config_contains_generated_secrets error
* Address Copilot's comments
* Address second phrase of Copilot's comments
---------
Co-authored-by: Cursor Agent <[email protected]>
Co-authored-by: Jason(Zhe-You) Liu <[email protected]>
---
airflow-core/src/airflow/configuration.py | 143 ++------------
.../src/airflow/jobs/scheduler_job_runner.py | 26 ++-
airflow-core/src/airflow/providers_manager.py | 23 +--
airflow-core/tests/integration/otel/test_otel.py | 5 +
airflow-core/tests/unit/core/test_configuration.py | 3 +
airflow-core/tests/unit/jobs/test_scheduler_job.py | 10 +-
scripts/ci/prek/check_airflow_imports_in_shared.py | 23 +++
.../src/airflow_shared/configuration/parser.py | 208 +++++++++++++++++++--
.../tests/configuration/test_parser.py | 122 +++++++++++-
task-sdk/src/airflow/sdk/configuration.py | 19 +-
.../src/airflow/sdk/providers_manager_runtime.py | 33 ++++
.../task_sdk/test_providers_manager_runtime.py | 35 ++++
12 files changed, 478 insertions(+), 172 deletions(-)
diff --git a/airflow-core/src/airflow/configuration.py
b/airflow-core/src/airflow/configuration.py
index 69d936be01d..1426199c7e0 100644
--- a/airflow-core/src/airflow/configuration.py
+++ b/airflow-core/src/airflow/configuration.py
@@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations
-import contextlib
import logging
import os
import pathlib
@@ -29,7 +28,6 @@ import warnings
from base64 import b64encode
from collections.abc import Callable
from configparser import ConfigParser
-from copy import deepcopy
from inspect import ismodule
from io import StringIO
from re import Pattern
@@ -39,9 +37,7 @@ from urllib.parse import urlsplit
from typing_extensions import overload
from airflow._shared.configuration.parser import (
- VALUE_NOT_FOUND_SENTINEL,
AirflowConfigParser as _SharedAirflowConfigParser,
- ValueNotFound,
configure_parser_from_configuration_description,
)
from airflow._shared.module_loading import import_string
@@ -207,10 +203,19 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
# interpolation placeholders. The _default_values config parser will
interpolate them
# properly when we call get() on it.
_default_values =
create_default_config_parser(configuration_description)
- super().__init__(configuration_description, _default_values, *args,
**kwargs)
+ from airflow.providers_manager import ProvidersManager
+
+ super().__init__(
+ configuration_description,
+ _default_values,
+ ProvidersManager,
+ create_default_config_parser,
+ _default_config_file_path("provider_config_fallback_defaults.cfg"),
+ *args,
+ **kwargs,
+ )
self.configuration_description = configuration_description
self._default_values = _default_values
- self._provider_config_fallback_default_values =
create_provider_config_fallback_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
@@ -228,35 +233,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
self._upgrade_postgres_metastore_conn,
]
- @property
- def _lookup_sequence(self) -> list[Callable]:
- """Overring _lookup_sequence from shared base class to add provider
fallbacks."""
- return super()._lookup_sequence +
[self._get_option_from_provider_fallbacks]
-
- def _get_config_sources_for_as_dict(self) -> list[tuple[str,
ConfigParser]]:
- """Override the base method to add provider fallbacks."""
- return [
- ("provider-fallback-defaults",
self._provider_config_fallback_default_values),
- ("default", self._default_values),
- ("airflow.cfg", self),
- ]
-
- def _get_option_from_provider_fallbacks(
- self,
- deprecated_key: str | None,
- deprecated_section: str | None,
- key: str,
- section: str,
- issue_warning: bool = True,
- extra_stacklevel: int = 0,
- **kwargs,
- ) -> str | ValueNotFound:
- """Get config option from provider fallback defaults."""
- if self.get_provider_config_fallback_defaults(section, key) is not
None:
- # no expansion needed
- return self.get_provider_config_fallback_defaults(section, key,
**kwargs)
- return VALUE_NOT_FOUND_SENTINEL
-
def _update_logging_deprecated_template_to_one_from_defaults(self):
default = self.get_default_value("logging", "log_filename_template")
if default:
@@ -267,13 +243,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
default,
)
- def get_provider_config_fallback_defaults(self, section: str, key: str,
**kwargs) -> Any:
- """Get provider config fallback default values."""
- # Remove team_name from kwargs as the fallback defaults ConfigParser
- # does not support team-aware lookups (it's a standard ConfigParser).
- kwargs.pop("team_name", None)
- return self._provider_config_fallback_default_values.get(section, key,
fallback=None, **kwargs)
-
# A mapping of old default values that we want to change and warn the user
# about. Mapping of section -> setting -> { old, replace }
deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
@@ -403,7 +372,7 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
if not self._providers_configuration_loaded:
from airflow.providers_manager import ProvidersManager
- ProvidersManager()._initialize_providers_configuration()
+ ProvidersManager().initialize_providers_configuration()
def _ensure_providers_config_unloaded(self) -> bool:
"""Ensure providers configurations are unloaded temporarily to load
core configs. Returns True if providers get unloaded."""
@@ -416,17 +385,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
"""Reload providers configuration."""
self.load_providers_configuration()
- def restore_core_default_configuration(self) -> None:
- """
- Restore default configuration for core Airflow.
-
- It does not restore configuration for providers. If you want to
restore configuration for
- providers, you need to call ``load_providers_configuration`` method.
- """
- self.configuration_description =
retrieve_configuration_description(include_providers=False)
- self._default_values =
create_default_config_parser(self.configuration_description)
- self._providers_configuration_loaded = False
-
def _upgrade_postgres_metastore_conn(self):
"""
Upgrade SQL schemas.
@@ -490,6 +448,11 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
f"See
{get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
)
+ def _get_custom_secret_backend(self, worker_mode: bool | None = None) ->
Any | None:
+ return super()._get_custom_secret_backend(
+ worker_mode=worker_mode if worker_mode is not None else False
+ )
+
def mask_secrets(self):
from airflow._shared.configuration.parser import
_build_kwarg_env_prefix, _collect_kwarg_env_vars
from airflow._shared.secrets_masker import mask_secret as
mask_secret_core
@@ -567,52 +530,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
"""Checks if providers have been loaded."""
return self._providers_configuration_loaded
- def load_providers_configuration(self):
- """
- Load configuration for providers.
-
- This should be done after initial configuration have been performed.
Initializing and discovering
- providers is an expensive operation and cannot be performed when we
load configuration for the first
- time when airflow starts, because we initialize configuration very
early, during importing of the
- `airflow` package and the module is not yet ready to be used when it
happens and until configuration
- and settings are loaded. Therefore, in order to reload provider
configuration we need to additionally
- load provider - specific configuration.
- """
- log.debug("Loading providers configuration")
- from airflow.providers_manager import ProvidersManager
-
- self.restore_core_default_configuration()
- for provider, config in
ProvidersManager().already_initialized_provider_configs:
- for provider_section, provider_section_content in config.items():
- provider_options = provider_section_content["options"]
- section_in_current_config =
self.configuration_description.get(provider_section)
- if not section_in_current_config:
- self.configuration_description[provider_section] =
deepcopy(provider_section_content)
- section_in_current_config =
self.configuration_description.get(provider_section)
- section_in_current_config["source"] = f"default-{provider}"
- for option in provider_options:
- section_in_current_config["options"][option]["source"]
= f"default-{provider}"
- else:
- section_source = section_in_current_config.get("source",
"Airflow's core package").split(
- "default-"
- )[-1]
- raise AirflowConfigException(
- f"The provider {provider} is attempting to contribute "
- f"configuration section {provider_section} that "
- f"has already been added before. The source of it:
{section_source}. "
- "This is forbidden. A provider can only add new
sections. It "
- "cannot contribute options to existing sections or
override other "
- "provider's configuration.",
- UserWarning,
- )
- self._default_values =
create_default_config_parser(self.configuration_description)
- # sensitive_config_values needs to be refreshed here. This is a
cached_property, so we can delete
- # the cached values, and it will be refreshed on next access.
- with contextlib.suppress(AttributeError):
- # no problem if cache is not set yet
- del self.sensitive_config_values
- self._providers_configuration_loaded = True
-
def _get_config_value_from_secret_backend(self, config_key: str) -> str |
None:
"""
Override to use module-level function that reads from global conf.
@@ -702,30 +619,6 @@ def
create_default_config_parser(configuration_description: dict[str, dict[str,
return parser
-def create_provider_config_fallback_defaults() -> ConfigParser:
- """
- Create fallback defaults.
-
- This parser contains provider defaults for Airflow configuration,
containing fallback default values
- that might be needed when provider classes are being imported - before
provider's configuration
- is loaded.
-
- Unfortunately airflow currently performs a lot of stuff during importing
and some of that might lead
- to retrieving provider configuration before the defaults for the provider
are loaded.
-
- Those are only defaults, so if you have "real" values configured in your
configuration (.cfg file or
- environment variables) those will be used as usual.
-
- NOTE!! Do NOT attempt to remove those default fallbacks thinking that they
are unnecessary duplication,
- at least not until we fix the way how airflow imports "do stuff". This is
unlikely to succeed.
-
- You've been warned!
- """
- config_parser = ConfigParser()
-
config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
- return config_parser
-
-
def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
airflow_config = pathlib.Path(AIRFLOW_CONFIG)
if airflow_config.is_dir():
@@ -754,11 +647,13 @@ def write_default_airflow_configuration_if_needed() ->
AirflowConfigParser:
conf.configuration_description["core"]["options"]["fernet_key"]["default"] = (
_SecretKeys.fernet_key
)
+ conf._default_values.set("core", "fernet_key",
_SecretKeys.fernet_key)
_SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"]
= (
_SecretKeys.jwt_secret_key
)
+ conf._default_values.set("api_auth", "jwt_secret",
_SecretKeys.jwt_secret_key)
pathlib.Path(airflow_config.__fspath__()).touch()
make_group_other_inaccessible(airflow_config.__fspath__())
with open(airflow_config, "w") as file:
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index c64bd166f1b..bda95bbf4b0 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -271,6 +271,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.num_runs = num_runs
self.only_idle = only_idle
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
+
+ # Note:
+ # We need to fetch all conf values before the `prohibit_commit` block;
otherwise the Core conf may
+ # access the MetadataMetastoreBackend and trigger `UNEXPECTED COMMIT -
THIS WILL BREAK HA LOCKS`.
+ # The easiest way to keep the scheduler loop side-effect free is to
read those values in `__init__`.
+
# How many seconds do we wait for tasks to heartbeat before timeout.
self._task_instance_heartbeat_timeout_secs = conf.getint(
"scheduler", "task_instance_heartbeat_timeout"
@@ -284,6 +290,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
key="num_stuck_in_queued_retries",
fallback=2,
)
+ self._scheduler_use_job_schedule = conf.getboolean("scheduler",
"use_job_schedule", fallback=True)
+ self._parallelism = conf.getint("core", "parallelism")
+ self._multi_team = conf.getboolean("core", "multi_team")
self.executors: list[BaseExecutor] = executors if executors else
ExecutorLoader.init_executors()
self.executor: BaseExecutor = self.executors[0]
@@ -656,7 +665,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.info("%s tasks up for execution:\n%s",
len(task_instances_to_examine), task_instance_str)
dag_id_to_team_name: dict[str, str | None] = {}
- if conf.getboolean("core", "multi_team"):
+ if self._multi_team:
# Batch query to resolve team names for all DAG IDs to
optimize performance
# Instead of individual queries in _try_to_load_executor(),
resolve all team names upfront
unique_dag_ids = {ti.dag_id for ti in
task_instances_to_examine}
@@ -1005,11 +1014,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# we need to make sure in the scheduler now that we don't schedule
more than core.parallelism totally
# across all executors.
num_occupied_slots = sum([executor.slots_occupied for executor in
self.executors])
- parallelism = conf.getint("core", "parallelism")
if self.job.max_tis_per_query == 0:
- max_tis = parallelism - num_occupied_slots
+ max_tis = self._parallelism - num_occupied_slots
else:
- max_tis = min(self.job.max_tis_per_query, parallelism -
num_occupied_slots)
+ max_tis = min(self.job.max_tis_per_query, self._parallelism -
num_occupied_slots)
if max_tis <= 0:
self.log.debug("max_tis query size is less than or equal to zero.
No query will be performed!")
return 0
@@ -1039,7 +1047,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
:param session: The database session
"""
num_occupied_slots = sum(executor.slots_occupied for executor in
self.executors)
- max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+ max_callbacks = self._parallelism - num_occupied_slots
if max_callbacks <= 0:
self.log.debug("No available slots for callbacks; all executors at
capacity")
@@ -1694,7 +1702,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
"""
# Put a check in place to make sure we don't commit unexpectedly
with prohibit_commit(session) as guard:
- if conf.getboolean("scheduler", "use_job_schedule", fallback=True):
+ if self._scheduler_use_job_schedule:
self._create_dagruns_for_dags(guard, session)
self._start_queued_dagruns(session)
@@ -2874,7 +2882,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
def _purge_task_instances_without_heartbeats(
self, task_instances_without_heartbeats: list[TI], *, session: Session
) -> None:
- if conf.getboolean("core", "multi_team"):
+ if self._multi_team:
unique_dag_ids = {ti.dag_id for ti in
task_instances_without_heartbeats}
dag_id_to_team_name =
self._get_team_names_for_dag_ids(unique_dag_ids, session)
else:
@@ -3122,7 +3130,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
) -> dict[BaseExecutor, list[SchedulerWorkload]]:
"""Organize workloads into lists per their respective executor."""
workloads_iter: Iterable[SchedulerWorkload]
- if conf.getboolean("core", "multi_team"):
+ if self._multi_team:
if dag_id_to_team_name is None:
if isinstance(workloads, list):
workloads_list = workloads
@@ -3170,7 +3178,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
will query the database to resolve team name. None
indicates global team.
"""
executor = None
- if conf.getboolean("core", "multi_team"):
+ if self._multi_team:
# Use provided team_name if available, otherwise query the database
if team_name is NOTSET:
team_name = self._get_workload_team_name(workload, session)
diff --git a/airflow-core/src/airflow/providers_manager.py
b/airflow-core/src/airflow/providers_manager.py
index a28f1bd9274..02f5a6957c9 100644
--- a/airflow-core/src/airflow/providers_manager.py
+++ b/airflow-core/src/airflow/providers_manager.py
@@ -610,23 +610,10 @@ class ProvidersManager(LoggingMixin):
@provider_info_cache("config")
def initialize_providers_configuration(self):
- """Lazy initialization of providers configuration information."""
- self._initialize_providers_configuration()
-
- def _initialize_providers_configuration(self):
- """
- Initialize providers configuration information.
-
- Should be used if we do not want to trigger caching for
``initialize_providers_configuration`` method.
- In some cases we might want to make sure that the configuration is
initialized, but we do not want
- to cache the initialization method - for example when we just want to
write configuration with
- providers, but it is used in the context where no providers are loaded
yet we will eventually
- restore the original configuration and we want the subsequent
``initialize_providers_configuration``
- method to be run in order to load the configuration for providers
again.
- """
+ """Lazy initialization of provider configuration metadata and merge it
into ``conf``."""
self.initialize_providers_list()
self._discover_config()
- # Now update conf with the new provider configuration from providers
+ # Imported lazily to avoid a configuration/providers_manager import
cycle during startup.
from airflow.configuration import conf
conf.load_providers_configuration()
@@ -1491,6 +1478,12 @@ class ProvidersManager(LoggingMixin):
self._executor_without_check_set.clear()
self._queue_class_name_set.clear()
self._provider_configs.clear()
+
+ # Imported lazily to avoid a configuration/providers_manager import
cycle during cleanup.
+ from airflow.configuration import conf
+
+ conf.invalidate_cache()
+
self._trigger_info_set.clear()
self._notification_info_set.clear()
self._plugins_set.clear()
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index c9bd4ad8b78..0d40156c45e 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -209,6 +209,11 @@ class TestOtelIntegration:
# during scheduler handoff (see
https://github.com/apache/airflow/issues/61070).
wait_for_otel_collector(otel_host, otel_port)
+ # The pytest plugin strips AIRFLOW__*__* env vars (including the JWT
secret set
+ # by Breeze). Both the scheduler and api-server subprocesses must
share the same
+ # secret; otherwise each generates its own random key and token
verification fails.
+ os.environ["AIRFLOW__API_AUTH__JWT_SECRET"] =
"test-secret-key-for-testing"
+ os.environ["AIRFLOW__API_AUTH__JWT_ISSUER"] = "airflow"
os.environ["AIRFLOW__TRACES__OTEL_ON"] = "True"
os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf"
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] =
"http://breeze-otel-collector:4318/v1/traces"
diff --git a/airflow-core/tests/unit/core/test_configuration.py
b/airflow-core/tests/unit/core/test_configuration.py
index be0f191f8c9..9ee2f737533 100644
--- a/airflow-core/tests/unit/core/test_configuration.py
+++ b/airflow-core/tests/unit/core/test_configuration.py
@@ -64,6 +64,9 @@ conf.deprecated_options[("scheduler",
"parsing_cleanup_interval")] = (
"deactivate_stale_dags_interval",
"2.5.0",
)
+# Invalidate cached properties that depend on deprecated_options, since they
may have been
+# computed during airflow initialization before the entries above were added.
+conf.invalidate_cache()
@pytest.fixture(scope="module", autouse=True)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 997ebd24bcf..f9ae60a9c35 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2779,7 +2779,10 @@ class TestSchedulerJob:
task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job)
+ with conf_vars({("core", "parallelism"): "40"}):
+ # 40 dag runs * 2 tasks each = 80. Two executors have capacity for
61 concurrent jobs, but they
+ # together respect core.parallelism and will not run more in
aggregate then that allows.
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
def _create_dagruns():
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.RUNNING)
@@ -2804,10 +2807,7 @@ class TestSchedulerJob:
executor.slots_available = 31
total_enqueued = 0
- with conf_vars({("core", "parallelism"): "40"}):
- # 40 dag runs * 2 tasks each = 80. Two executors have capacity for
61 concurrent jobs, but they
- # together respect core.parallelism and will not run more in
aggregate then that allows.
- total_enqueued +=
self.job_runner._critical_section_enqueue_task_instances(session)
+ total_enqueued +=
self.job_runner._critical_section_enqueue_task_instances(session)
if task1_exec != task2_exec:
# Two executors will execute up to core parallelism
diff --git a/scripts/ci/prek/check_airflow_imports_in_shared.py
b/scripts/ci/prek/check_airflow_imports_in_shared.py
index 63dd94e090f..ece603ec11a 100755
--- a/scripts/ci/prek/check_airflow_imports_in_shared.py
+++ b/scripts/ci/prek/check_airflow_imports_in_shared.py
@@ -32,6 +32,26 @@ from pathlib import Path
from common_prek_utils import console
+def _is_type_checking_guard(node: ast.If) -> bool:
+ """Check if an If node is a ``TYPE_CHECKING`` guard."""
+ test = node.test
+ if isinstance(test, ast.Name) and test.id == "TYPE_CHECKING":
+ return True
+ if isinstance(test, ast.Attribute) and test.attr == "TYPE_CHECKING":
+ return True
+ return False
+
+
+def _collect_type_checking_node_ids(tree: ast.AST) -> set[int]:
+ """Return the ``id()`` of every AST node nested inside an ``if
TYPE_CHECKING`` block."""
+ guarded: set[int] = set()
+ for node in ast.walk(tree):
+ if isinstance(node, ast.If) and _is_type_checking_guard(node):
+ for child in ast.walk(node):
+ guarded.add(id(child))
+ return guarded
+
+
def check_file_for_prohibited_imports(file_path: Path) -> list[tuple[int,
str]]:
try:
source = file_path.read_text(encoding="utf-8")
@@ -39,9 +59,12 @@ def check_file_for_prohibited_imports(file_path: Path) ->
list[tuple[int, str]]:
except (OSError, UnicodeDecodeError, SyntaxError):
return []
+ type_checking_ids = _collect_type_checking_node_ids(tree)
violations = []
for node in ast.walk(tree):
+ if id(node) in type_checking_ids:
+ continue
# Check `from airflow.x import y` statements
if isinstance(node, ast.ImportFrom):
if node.module and node.module.startswith("airflow."):
diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py
b/shared/configuration/src/airflow_shared/configuration/parser.py
index b06f9611a98..55f24c529e0 100644
--- a/shared/configuration/src/airflow_shared/configuration/parser.py
+++ b/shared/configuration/src/airflow_shared/configuration/parser.py
@@ -33,10 +33,11 @@ import warnings
from collections.abc import Callable, Generator, Iterable
from configparser import ConfigParser, NoOptionError, NoSectionError
from contextlib import contextmanager
+from copy import deepcopy
from enum import Enum
from json.decoder import JSONDecodeError
from re import Pattern
-from typing import IO, Any, TypeVar, overload
+from typing import IO, TYPE_CHECKING, Any, TypeVar, overload
from .exceptions import AirflowConfigException
@@ -78,6 +79,11 @@ ConfigSourcesType = dict[str, ConfigSectionSourcesType]
ENV_VAR_PREFIX = "AIRFLOW__"
+if TYPE_CHECKING:
+ from airflow.providers_manager import ProvidersManager
+ from airflow.sdk.providers_manager_runtime import
ProvidersManagerTaskRuntime
+
+
class ValueNotFound:
"""Object of this is raised when a configuration value cannot be found."""
@@ -166,6 +172,34 @@ def configure_parser_from_configuration_description(
parser.set(section, key, default_value)
+def create_provider_cfg_config_fallback_defaults(
+ provider_config_fallback_defaults_cfg_path: str,
+) -> ConfigParser:
+ """
+ Create fallback defaults for configuration.
+
+ This parser contains provider defaults for Airflow configuration,
containing fallback default values
+ that might be needed when provider classes are being imported - before
provider's configuration
+ is loaded.
+
+ Unfortunately airflow currently performs a lot of stuff during importing
and some of that might lead
+ to retrieving provider configuration before the defaults for the provider
are loaded.
+
+ Those are only defaults, so if you have "real" values configured in your
configuration (.cfg file or
+ environment variables) those will be used as usual.
+
+ NOTE!! Do NOT attempt to remove those default fallbacks thinking that they
are unnecessary duplication,
+ at least not until we fix the way how airflow imports "do stuff". This is
unlikely to succeed.
+
+ You've been warned!
+
+ :param provider_config_fallback_defaults_cfg_path: path to the provider
config fallback defaults .cfg file
+ """
+ config_parser = ConfigParser()
+ config_parser.read(provider_config_fallback_defaults_cfg_path)
+ return config_parser
+
+
class AirflowConfigParser(ConfigParser):
"""
Base configuration parser with pure parsing logic.
@@ -241,7 +275,85 @@ class AirflowConfigParser(ConfigParser):
self._get_option_from_commands,
self._get_option_from_secrets,
self._get_option_from_defaults,
+ self._get_option_from_provider_cfg_config_fallbacks,
+ self._get_option_from_provider_metadata_config_fallbacks,
+ ]
+
+ def _get_config_sources_for_as_dict(self) -> list[tuple[str,
ConfigParser]]:
+ """Override the base method to add provider fallbacks when providers
are loaded."""
+ sources: list[tuple[str, ConfigParser]] = [
+ ("default", self._default_values),
+ ("airflow.cfg", self),
]
+ if self._providers_configuration_loaded:
+ sources.insert(
+ 0,
+ (
+ "provider-metadata-fallback-defaults",
+ self._provider_metadata_config_fallback_default_values,
+ ),
+ )
+ sources.insert(
+ 0,
+ ("provider-cfg-fallback-defaults",
self._provider_cfg_config_fallback_default_values),
+ )
+ return sources
+
+ def _get_option_from_provider_cfg_config_fallbacks(
+ self,
+ deprecated_key: str | None,
+ deprecated_section: str | None,
+ key: str,
+ section: str,
+ issue_warning: bool = True,
+ extra_stacklevel: int = 0,
+ **kwargs,
+ ) -> str | ValueNotFound:
+ """Get config option from provider fallback defaults."""
+ value = self.get_from_provider_cfg_config_fallback_defaults(section,
key, **kwargs)
+ if value is not None:
+ return value
+ return VALUE_NOT_FOUND_SENTINEL
+
+ def _get_option_from_provider_metadata_config_fallbacks(
+ self,
+ deprecated_key: str | None,
+ deprecated_section: str | None,
+ key: str,
+ section: str,
+ issue_warning: bool = True,
+ extra_stacklevel: int = 0,
+ **kwargs,
+ ) -> str | ValueNotFound:
+ """Get config option from provider metadata fallback defaults."""
+ value =
self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
+ if value is not None:
+ return value
+ return VALUE_NOT_FOUND_SENTINEL
+
+ def get_from_provider_cfg_config_fallback_defaults(self, section: str,
key: str, **kwargs) -> Any:
+ """Get provider config fallback default values."""
+ raw = kwargs.get("raw", False)
+ vars_ = kwargs.get("vars")
+ return self._provider_cfg_config_fallback_default_values.get(
+ section, key, fallback=None, raw=raw, vars=vars_
+ )
+
+ @functools.cached_property
+ def _provider_metadata_config_fallback_default_values(self) ->
ConfigParser:
+ """Return Provider metadata config fallback default values."""
+ base_configuration_description: dict[str, dict[str, Any]] = {}
+ for _, config in self._provider_manager_type().provider_configs:
+ base_configuration_description.update(config)
+ return
self._create_default_config_parser_callable(base_configuration_description)
+
+ def get_from_provider_metadata_config_fallback_defaults(self, section:
str, key: str, **kwargs) -> Any:
+ """Get provider metadata config fallback default values."""
+ raw = kwargs.get("raw", False)
+ vars_ = kwargs.get("vars")
+ return self._provider_metadata_config_fallback_default_values.get(
+ section, key, fallback=None, raw=raw, vars=vars_
+ )
@property
def _validators(self) -> list[Callable[[], None]]:
@@ -300,6 +412,9 @@ class AirflowConfigParser(ConfigParser):
self,
configuration_description: dict[str, dict[str, Any]],
_default_values: ConfigParser,
+ provider_manager_type: type[ProvidersManager] |
type[ProvidersManagerTaskRuntime],
+ create_default_config_parser_callable: Callable[[dict[str, dict[str,
Any]]], ConfigParser],
+ provider_config_fallback_defaults_cfg_path: str,
*args,
**kwargs,
):
@@ -308,12 +423,36 @@ class AirflowConfigParser(ConfigParser):
:param configuration_description: Description of configuration options
:param _default_values: ConfigParser with default values
+ :param provider_manager_type: Either ProvidersManager or
ProvidersManagerTaskRuntime, depending on the context of the caller.
+ :param create_default_config_parser_callable: The
`create_default_config_parser` function from core or SDK, depending on the
context of the caller.
+ :param provider_config_fallback_defaults_cfg_path: Path to the
`provider_config_fallback_defaults.cfg` file.
"""
super().__init__(*args, **kwargs)
self.configuration_description = configuration_description
+ self._base_configuration_description =
deepcopy(configuration_description)
self._default_values = _default_values
+ self._provider_manager_type = provider_manager_type
+ self._create_default_config_parser_callable =
create_default_config_parser_callable
+ self._provider_cfg_config_fallback_default_values =
create_provider_cfg_config_fallback_defaults(
+ provider_config_fallback_defaults_cfg_path
+ )
self._suppress_future_warnings = False
self.upgraded_values: dict[tuple[str, str], str] = {}
+ self._providers_configuration_loaded = False
+
+ def invalidate_cache(self) -> None:
+ """
+ Clear all ``functools.cached_property`` entries on this instance.
+
+ Call this after mutating class-level attributes (e.g.
``deprecated_options``)
+ so that derived cached properties are recomputed on next access.
+ """
+ for attr_name in (
+ name
+ for name in dir(type(self))
+ if isinstance(getattr(type(self), name, None),
functools.cached_property)
+ ):
+ self.__dict__.pop(attr_name, None)
@functools.cached_property
def inversed_deprecated_options(self):
@@ -1046,6 +1185,56 @@ class AirflowConfigParser(ConfigParser):
return section, key, deprecated_section, deprecated_key,
warning_emitted
+ def load_providers_configuration(self) -> None:
+ """
+ Load configuration for providers.
+
+ This should be done after initial configuration have been performed.
Initializing and discovering
+ providers is an expensive operation and cannot be performed when we
load configuration for the first
+ time when airflow starts, because we initialize configuration very
early, during importing of the
+ `airflow` package and the module is not yet ready to be used when it
happens and until configuration
+ and settings are loaded. Therefore, in order to reload provider
configuration we need to additionally
+ load provider - specific configuration.
+ """
+ log.debug("Loading providers configuration")
+
+ self.restore_core_default_configuration()
+ for provider, config in
self._provider_manager_type().already_initialized_provider_configs:
+ for provider_section, provider_section_content in config.items():
+ provider_options = provider_section_content["options"]
+ section_in_current_config =
self.configuration_description.get(provider_section)
+ if not section_in_current_config:
+ self.configuration_description[provider_section] =
deepcopy(provider_section_content)
+ section_in_current_config =
self.configuration_description.get(provider_section)
+ section_in_current_config["source"] = f"default-{provider}"
+ for option in provider_options:
+ section_in_current_config["options"][option]["source"]
= f"default-{provider}"
+ else:
+ section_source = section_in_current_config.get("source",
"Airflow's core package").split(
+ "default-"
+ )[-1]
+ raise AirflowConfigException(
+ f"The provider {provider} is attempting to contribute "
+ f"configuration section {provider_section} that "
+ f"has already been added before. The source of it:
{section_source}. "
+ "This is forbidden. A provider can only add new
sections. It "
+ "cannot contribute options to existing sections or
override other "
+ "provider's configuration.",
+ UserWarning,
+ )
+ self._default_values =
self._create_default_config_parser_callable(self.configuration_description)
+ # Cached properties derived from configuration_description (e.g.
sensitive_config_values) need
+ # to be recomputed now that provider config has been merged in.
+ self.invalidate_cache()
+ self._providers_configuration_loaded = True
+
+ def restore_core_default_configuration(self) -> None:
+ """Restore the parser state before provider-contributed sections were
loaded."""
+ self.configuration_description =
deepcopy(self._base_configuration_description)
+ self._default_values =
self._create_default_config_parser_callable(self.configuration_description)
+ self.invalidate_cache()
+ self._providers_configuration_loaded = False
+
@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) ->
str: ...
@@ -1428,17 +1617,6 @@ class AirflowConfigParser(ConfigParser):
"""
return optionstr
- def _get_config_sources_for_as_dict(self) -> list[tuple[str,
ConfigParser]]:
- """
- Get list of config sources to use in as_dict().
-
- Subclasses can override to add additional sources (e.g., provider
configs).
- """
- return [
- ("default", self._default_values),
- ("airflow.cfg", self),
- ]
-
def as_dict(
self,
display_source: bool = False,
@@ -1741,10 +1919,10 @@ class AirflowConfigParser(ConfigParser):
:param extra_spacing: Add extra spacing before examples and after
variables
:param only_defaults: Only include default values when writing the
config, not the actual values
"""
- sources_dict = {}
- if include_sources:
- sources_dict = self.as_dict(display_source=True)
with
self.make_sure_configuration_loaded(with_providers=include_providers):
+ sources_dict = {}
+ if include_sources:
+ sources_dict = self.as_dict(display_source=True)
for section_to_write in self.get_sections_including_defaults():
section_config_description =
self.configuration_description.get(section_to_write, {})
if section_to_write != section and section is not None:
diff --git a/shared/configuration/tests/configuration/test_parser.py
b/shared/configuration/tests/configuration/test_parser.py
index a018a931ed5..45b8ee1fd57 100644
--- a/shared/configuration/tests/configuration/test_parser.py
+++ b/shared/configuration/tests/configuration/test_parser.py
@@ -26,6 +26,7 @@ import re
import textwrap
from configparser import ConfigParser
from enum import Enum
+from io import StringIO
from unittest.mock import patch
import pytest
@@ -37,10 +38,39 @@ from airflow_shared.configuration.parser import (
)
+class _NoOpProvidersManager:
+ """Stub providers manager for tests — no providers, no side effects."""
+
+ @property
+ def provider_configs(self):
+ return []
+
+ @property
+ def already_initialized_provider_configs(self):
+ return []
+
+
+def _create_empty_config_parser(desc: dict) -> ConfigParser:
+ return ConfigParser()
+
+
+def _create_default_config_parser(desc: dict) -> ConfigParser:
+ parser = ConfigParser()
+ configure_parser_from_configuration_description(parser, desc, {})
+ return parser
+
+
class AirflowConfigParser(_SharedAirflowConfigParser):
"""Test parser that extends shared parser for testing."""
- def __init__(self, default_config: str | None = None, *args, **kwargs):
+ def __init__(
+ self,
+ default_config: str | None = None,
+ provider_manager_type=_NoOpProvidersManager,
+ create_default_config_parser_callable=_create_empty_config_parser,
+ *args,
+ **kwargs,
+ ):
configuration_description = {
"test": {
"options": {
@@ -53,7 +83,15 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
_default_values.add_section("test")
_default_values.set("test", "key1", "default_value")
_default_values.set("test", "key2", "123")
- super().__init__(configuration_description, _default_values, *args,
**kwargs)
+ super().__init__(
+ configuration_description,
+ _default_values,
+ provider_manager_type,
+ create_default_config_parser_callable,
+ "",
+ *args,
+ **kwargs,
+ )
self.configuration_description = configuration_description
self._default_values = _default_values
self._suppress_future_warnings = False
@@ -71,6 +109,22 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
for key, value in parser.items(section):
self._default_values.set(section, key, value)
+ def _ensure_providers_config_loaded(self) -> None:
+ """Load provider configuration for tests when requested."""
+ if not self._providers_configuration_loaded:
+ self.load_providers_configuration()
+
+ def _ensure_providers_config_unloaded(self) -> bool:
+ """Unload provider configuration for tests when requested."""
+ if self._providers_configuration_loaded:
+ self.restore_core_default_configuration()
+ return True
+ return False
+
+ def _reload_provider_configs(self) -> None:
+ """Reload provider configuration for tests after temporary unloads."""
+ self.load_providers_configuration()
+
class TestAirflowConfigParser:
"""Test the shared AirflowConfigParser parser methods."""
@@ -782,6 +836,61 @@ existing_list = one,two,three
with pytest.raises(ValueError, match=r"The value test/missing_key
should be set!"):
test_conf.get_mandatory_list_value("test", "missing_key",
fallback=None)
+ def
test_as_dict_only_materializes_provider_sources_after_loading_providers(self):
+ test_conf = AirflowConfigParser()
+
+ test_conf.as_dict(display_source=True)
+ assert "_provider_metadata_config_fallback_default_values" not in
test_conf.__dict__
+
+ test_conf.load_providers_configuration()
+ test_conf.as_dict(display_source=True)
+ assert "_provider_metadata_config_fallback_default_values" in
test_conf.__dict__
+
+ def test_write_materializes_provider_sources_in_requested_context(self):
+ test_conf = AirflowConfigParser()
+
+ test_conf.write(StringIO(), include_sources=True,
include_providers=False)
+ assert "_provider_metadata_config_fallback_default_values" not in
test_conf.__dict__
+
+ test_conf.write(StringIO(), include_sources=True,
include_providers=True)
+ assert "_provider_metadata_config_fallback_default_values" in
test_conf.__dict__
+
+ def
test_get_uses_provider_metadata_fallback_before_loading_providers(self):
+ provider_configs = [
+ (
+ "apache-airflow-providers-test",
+ {
+ "test_provider": {
+ "options": {
+ "test_option": {
+ "default": "provider-default",
+ }
+ }
+ }
+ },
+ )
+ ]
+
+ class ProvidersManagerWithConfig:
+ @property
+ def provider_configs(self):
+ return provider_configs
+
+ @property
+ def already_initialized_provider_configs(self):
+ return []
+
+ test_conf = AirflowConfigParser(
+ provider_manager_type=ProvidersManagerWithConfig,
+
create_default_config_parser_callable=_create_default_config_parser,
+ )
+
+ assert test_conf._providers_configuration_loaded is False
+ assert test_conf.configuration_description.get("test_provider") is None
+ assert test_conf.get("test_provider", "test_option") ==
"provider-default"
+ assert test_conf._providers_configuration_loaded is False
+ assert test_conf.configuration_description.get("test_provider") is None
+
def test_set_case_insensitive(self):
# both get and set should be case insensitive
test_conf = AirflowConfigParser()
@@ -861,7 +970,14 @@ existing_list = one,two,three
configure_parser_from_configuration_description(
_default_values, configuration_description, {}
)
- _SharedAirflowConfigParser.__init__(self,
configuration_description, _default_values)
+ _SharedAirflowConfigParser.__init__(
+ self,
+ configuration_description,
+ _default_values,
+ _NoOpProvidersManager,
+ _create_empty_config_parser,
+ "",
+ )
test_conf = TestConfigParser()
deprecated_conf_list = [
diff --git a/task-sdk/src/airflow/sdk/configuration.py
b/task-sdk/src/airflow/sdk/configuration.py
index 30e073d9ee3..64bda4b3a56 100644
--- a/task-sdk/src/airflow/sdk/configuration.py
+++ b/task-sdk/src/airflow/sdk/configuration.py
@@ -125,11 +125,23 @@ class AirflowSDKConfigParser(_SharedAirflowConfigParser):
*args,
**kwargs,
):
+ # Imported lazily to preserve the module-level lazy ``conf``
initialization and avoid a
+ # configuration/providers_manager_runtime import cycle.
+ from airflow.sdk.providers_manager_runtime import
ProvidersManagerTaskRuntime
+
# Read Core's config.yml (Phase 1: shared config.yml)
configuration_description = retrieve_configuration_description()
# Create default values parser
_default_values =
create_default_config_parser(configuration_description)
- super().__init__(configuration_description, _default_values, *args,
**kwargs)
+ super().__init__(
+ configuration_description,
+ _default_values,
+ ProvidersManagerTaskRuntime,
+ create_default_config_parser,
+ _default_config_file_path("provider_config_fallback_defaults.cfg"),
+ *args,
+ **kwargs,
+ )
self.configuration_description = configuration_description
self._default_values = _default_values
self._suppress_future_warnings = False
@@ -145,6 +157,11 @@ class AirflowSDKConfigParser(_SharedAirflowConfigParser):
if default_config is not None:
self._update_defaults_from_string(default_config)
+ def _get_custom_secret_backend(self, worker_mode: bool | None = None) ->
Any | None:
+ return super()._get_custom_secret_backend(
+ worker_mode=worker_mode if worker_mode is not None else True
+ )
+
def expand_all_configuration_values(self):
"""Expand all configuration values using SDK-specific expansion
variables."""
all_vars = get_sdk_expansion_variables()
diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py
b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
index e7b1b65e2bf..4d764596814 100644
--- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py
+++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
@@ -152,6 +152,8 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
self._plugins_set: set[PluginInfo] = set()
self._provider_schema_validator =
_create_provider_info_schema_validator()
self._init_airflow_core_hooks()
+ # Populated by initialize_provider_configs(); holds
provider-contributed config sections.
+ self._provider_configs: dict[str, dict[str, Any]] = {}
def _init_airflow_core_hooks(self):
"""Initialize the hooks dict with default hooks from Airflow core."""
@@ -218,6 +220,22 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
self.initialize_providers_list()
self._discover_taskflow_decorators()
+ @provider_info_cache("provider_configs")
+ def initialize_provider_configs(self):
+ """Lazy initialization of provider configuration metadata and merge it
into SDK ``conf``."""
+ self.initialize_providers_list()
+ self._discover_config()
+ # Imported lazily to preserve SDK conf lazy initialization and avoid a
configuration/runtime cycle.
+ from airflow.sdk.configuration import conf
+
+ conf.load_providers_configuration()
+
+ def _discover_config(self) -> None:
+ """Retrieve all configs defined in the providers."""
+ for provider_package, provider in self._provider_dict.items():
+ if config := provider.data.get("config"):
+ self._provider_configs[provider_package] = config
+
def _discover_hooks_from_connection_types(
self,
hook_class_names_registered: set[str],
@@ -597,6 +615,15 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
self.initialize_providers_plugins()
return sorted(self._plugins_set, key=lambda x: x.plugin_class)
+ @property
+ def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
+ self.initialize_provider_configs()
+ return sorted(self._provider_configs.items(), key=lambda x: x[0])
+
+ @property
+ def already_initialized_provider_configs(self) -> list[tuple[str,
dict[str, Any]]]:
+ return sorted(self._provider_configs.items(), key=lambda x: x[0])
+
def _cleanup(self):
self._initialized_cache.clear()
self._provider_dict.clear()
@@ -608,6 +635,12 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
self._asset_uri_handlers.clear()
self._asset_factories.clear()
self._asset_to_openlineage_converters.clear()
+ self._provider_configs.clear()
+
+ # Imported lazily to preserve SDK conf lazy initialization and avoid a
configuration/runtime cycle.
+ from airflow.sdk.configuration import conf
+
+ conf.invalidate_cache()
self._initialized = False
self._initialization_stack_trace = None
diff --git a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
index da6600a6fda..aee5a115363 100644
--- a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
+++ b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
@@ -236,3 +236,38 @@ class TestProvidersManagerRuntime:
assert self._caplog.messages == [
"Optional provider feature disabled when importing 'HookClass'
from 'test_package' package"
]
+
+ def test_initialize_provider_configs_can_reload_sdk_conf(self):
+ from airflow.sdk.configuration import conf
+
+ providers_manager = ProvidersManagerTaskRuntime()
+ provider_config = {
+ "test_sdk_provider": {
+ "description": "Provider config used in runtime tests.",
+ "options": {
+ "test_option": {
+ "default": "provider-default",
+ }
+ },
+ }
+ }
+
+ def initialize_provider_configs() -> None:
+
providers_manager._provider_dict["apache-airflow-providers-test-sdk"] =
ProviderInfo(
+ version="0.0.1",
+ data={"config": provider_config},
+ )
+ with patch.object(providers_manager, "initialize_providers_list"):
+ providers_manager.initialize_provider_configs()
+
+ conf.restore_core_default_configuration()
+ try:
+ initialize_provider_configs()
+ assert conf.get("test_sdk_provider", "test_option") ==
"provider-default"
+
+ providers_manager._cleanup()
+
+ initialize_provider_configs()
+ assert conf.get("test_sdk_provider", "test_option") ==
"provider-default"
+ finally:
+ conf.restore_core_default_configuration()