This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f6dbffe81d Make TaskSDK conf respect default config from provider 
metadata (#62696)
2f6dbffe81d is described below

commit 2f6dbffe81d659f0d2d2cf1950c63d3241a4119c
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Mon Mar 23 16:54:27 2026 +0800

    Make TaskSDK conf respect default config from provider metadata (#62696)
    
    * Add lazy initialization for provider configs in 
ProvidersManagerTaskRuntime
    
    * Add common load_providers_configuration in shared lib
    
    * Add provider metadata config fallback lookup in core conf
    
    Fix _provider_metadata_config_fallback_default_values
    
    * Add provider lookup and related utils to shared
    
    * Add missing provider_configs property in ProvidersManagerTaskRuntime
    
    * Address all the comments from Kxail
    
    - remove _get_config_sources_for_as_dict in child class
    - clarify cached_property and constructing ProvidersManagerTaskRuntime
      in SDK conf
    - _get_option_from_provider_metadata_config_fallbacks nit
    - ProvidersManager lazy import
    - provider.data.get("config") nit
    - remove print in prek check
    
    * Fix unit tests
    
    * Fix TestDeprecatedConf tests failing when run in isolation
    
    The 6 tests in TestDeprecatedConf that use cmd/secret config sources
    failed when running only TestDeprecatedConf (not the full file) because
    sensitive_config_values -- a cached_property on the config parser -- was
    computed during airflow initialization before the test module added its
    deprecated_options entries. The stale cache meant _include_commands and
    _include_secrets never processed ('core', 'sql_alchemy_conn').
    
    Invalidate both sensitive_config_values and inversed_deprecated_options
    caches right after modifying deprecated_options at module level.
    
    Co-authored-by: Cursor Agent <[email protected]>
    Co-authored-by: Jason(Zhe-You) Liu <[email protected]>
    
    * Respect worker_mode of _get_custom_secret_backend for conf
    
    Fix _get_custom_secret_backend
    
    * Fetch all the conf in constructor of SchedulerJobRunner to avoid hitting
    prohibit_commit block
    
    * Fix test_execute_task_instances_unlimited_multiple_executors
    
    - We need the conf_vars block at constcutor level as we will pre-fetch
      all the conf at SchedulerJobRunner constructor
    
    * Fix otel integration test
    
    * Resolve partial of Amogh comments
    
    - Add invalidate_cache method at shared Parser
    - Unified initialize_providers_configuration for Core & SDK
      provider_manager
    - call conf.initialize_providers_configuration for both provider_manager
    - Replace manual clear cache with invalidate_cache method in
      test_configuration
    
    * Resolve Kaxil's second review comments
    
    * Fix test_write_default_config_contains_generated_secrets error
    
    * Address Copilot's comments
    
    * Address second phrase of Copilot's comments
    
    ---------
    
    Co-authored-by: Cursor Agent <[email protected]>
    Co-authored-by: Jason(Zhe-You) Liu <[email protected]>
---
 airflow-core/src/airflow/configuration.py          | 143 ++------------
 .../src/airflow/jobs/scheduler_job_runner.py       |  26 ++-
 airflow-core/src/airflow/providers_manager.py      |  23 +--
 airflow-core/tests/integration/otel/test_otel.py   |   5 +
 airflow-core/tests/unit/core/test_configuration.py |   3 +
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  10 +-
 scripts/ci/prek/check_airflow_imports_in_shared.py |  23 +++
 .../src/airflow_shared/configuration/parser.py     | 208 +++++++++++++++++++--
 .../tests/configuration/test_parser.py             | 122 +++++++++++-
 task-sdk/src/airflow/sdk/configuration.py          |  19 +-
 .../src/airflow/sdk/providers_manager_runtime.py   |  33 ++++
 .../task_sdk/test_providers_manager_runtime.py     |  35 ++++
 12 files changed, 478 insertions(+), 172 deletions(-)

diff --git a/airflow-core/src/airflow/configuration.py 
b/airflow-core/src/airflow/configuration.py
index 69d936be01d..1426199c7e0 100644
--- a/airflow-core/src/airflow/configuration.py
+++ b/airflow-core/src/airflow/configuration.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import contextlib
 import logging
 import os
 import pathlib
@@ -29,7 +28,6 @@ import warnings
 from base64 import b64encode
 from collections.abc import Callable
 from configparser import ConfigParser
-from copy import deepcopy
 from inspect import ismodule
 from io import StringIO
 from re import Pattern
@@ -39,9 +37,7 @@ from urllib.parse import urlsplit
 from typing_extensions import overload
 
 from airflow._shared.configuration.parser import (
-    VALUE_NOT_FOUND_SENTINEL,
     AirflowConfigParser as _SharedAirflowConfigParser,
-    ValueNotFound,
     configure_parser_from_configuration_description,
 )
 from airflow._shared.module_loading import import_string
@@ -207,10 +203,19 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
         # interpolation placeholders. The _default_values config parser will 
interpolate them
         # properly when we call get() on it.
         _default_values = 
create_default_config_parser(configuration_description)
-        super().__init__(configuration_description, _default_values, *args, 
**kwargs)
+        from airflow.providers_manager import ProvidersManager
+
+        super().__init__(
+            configuration_description,
+            _default_values,
+            ProvidersManager,
+            create_default_config_parser,
+            _default_config_file_path("provider_config_fallback_defaults.cfg"),
+            *args,
+            **kwargs,
+        )
         self.configuration_description = configuration_description
         self._default_values = _default_values
-        self._provider_config_fallback_default_values = 
create_provider_config_fallback_defaults()
         if default_config is not None:
             self._update_defaults_from_string(default_config)
         self._update_logging_deprecated_template_to_one_from_defaults()
@@ -228,35 +233,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
             self._upgrade_postgres_metastore_conn,
         ]
 
-    @property
-    def _lookup_sequence(self) -> list[Callable]:
-        """Overring _lookup_sequence from shared base class to add provider 
fallbacks."""
-        return super()._lookup_sequence + 
[self._get_option_from_provider_fallbacks]
-
-    def _get_config_sources_for_as_dict(self) -> list[tuple[str, 
ConfigParser]]:
-        """Override the base method to add provider fallbacks."""
-        return [
-            ("provider-fallback-defaults", 
self._provider_config_fallback_default_values),
-            ("default", self._default_values),
-            ("airflow.cfg", self),
-        ]
-
-    def _get_option_from_provider_fallbacks(
-        self,
-        deprecated_key: str | None,
-        deprecated_section: str | None,
-        key: str,
-        section: str,
-        issue_warning: bool = True,
-        extra_stacklevel: int = 0,
-        **kwargs,
-    ) -> str | ValueNotFound:
-        """Get config option from provider fallback defaults."""
-        if self.get_provider_config_fallback_defaults(section, key) is not 
None:
-            # no expansion needed
-            return self.get_provider_config_fallback_defaults(section, key, 
**kwargs)
-        return VALUE_NOT_FOUND_SENTINEL
-
     def _update_logging_deprecated_template_to_one_from_defaults(self):
         default = self.get_default_value("logging", "log_filename_template")
         if default:
@@ -267,13 +243,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
                 default,
             )
 
-    def get_provider_config_fallback_defaults(self, section: str, key: str, 
**kwargs) -> Any:
-        """Get provider config fallback default values."""
-        # Remove team_name from kwargs as the fallback defaults ConfigParser
-        # does not support team-aware lookups (it's a standard ConfigParser).
-        kwargs.pop("team_name", None)
-        return self._provider_config_fallback_default_values.get(section, key, 
fallback=None, **kwargs)
-
     # A mapping of old default values that we want to change and warn the user
     # about. Mapping of section -> setting -> { old, replace }
     deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
@@ -403,7 +372,7 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
         if not self._providers_configuration_loaded:
             from airflow.providers_manager import ProvidersManager
 
-            ProvidersManager()._initialize_providers_configuration()
+            ProvidersManager().initialize_providers_configuration()
 
     def _ensure_providers_config_unloaded(self) -> bool:
         """Ensure providers configurations are unloaded temporarily to load 
core configs. Returns True if providers get unloaded."""
@@ -416,17 +385,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
         """Reload providers configuration."""
         self.load_providers_configuration()
 
-    def restore_core_default_configuration(self) -> None:
-        """
-        Restore default configuration for core Airflow.
-
-        It does not restore configuration for providers. If you want to 
restore configuration for
-        providers, you need to call ``load_providers_configuration`` method.
-        """
-        self.configuration_description = 
retrieve_configuration_description(include_providers=False)
-        self._default_values = 
create_default_config_parser(self.configuration_description)
-        self._providers_configuration_loaded = False
-
     def _upgrade_postgres_metastore_conn(self):
         """
         Upgrade SQL schemas.
@@ -490,6 +448,11 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
             f"See 
{get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
         )
 
+    def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> 
Any | None:
+        return super()._get_custom_secret_backend(
+            worker_mode=worker_mode if worker_mode is not None else False
+        )
+
     def mask_secrets(self):
         from airflow._shared.configuration.parser import 
_build_kwarg_env_prefix, _collect_kwarg_env_vars
         from airflow._shared.secrets_masker import mask_secret as 
mask_secret_core
@@ -567,52 +530,6 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
         """Checks if providers have been loaded."""
         return self._providers_configuration_loaded
 
-    def load_providers_configuration(self):
-        """
-        Load configuration for providers.
-
-        This should be done after initial configuration have been performed. 
Initializing and discovering
-        providers is an expensive operation and cannot be performed when we 
load configuration for the first
-        time when airflow starts, because we initialize configuration very 
early, during importing of the
-        `airflow` package and the module is not yet ready to be used when it 
happens and until configuration
-        and settings are loaded. Therefore, in order to reload provider 
configuration we need to additionally
-        load provider - specific configuration.
-        """
-        log.debug("Loading providers configuration")
-        from airflow.providers_manager import ProvidersManager
-
-        self.restore_core_default_configuration()
-        for provider, config in 
ProvidersManager().already_initialized_provider_configs:
-            for provider_section, provider_section_content in config.items():
-                provider_options = provider_section_content["options"]
-                section_in_current_config = 
self.configuration_description.get(provider_section)
-                if not section_in_current_config:
-                    self.configuration_description[provider_section] = 
deepcopy(provider_section_content)
-                    section_in_current_config = 
self.configuration_description.get(provider_section)
-                    section_in_current_config["source"] = f"default-{provider}"
-                    for option in provider_options:
-                        section_in_current_config["options"][option]["source"] 
= f"default-{provider}"
-                else:
-                    section_source = section_in_current_config.get("source", 
"Airflow's core package").split(
-                        "default-"
-                    )[-1]
-                    raise AirflowConfigException(
-                        f"The provider {provider} is attempting to contribute "
-                        f"configuration section {provider_section} that "
-                        f"has already been added before. The source of it: 
{section_source}. "
-                        "This is forbidden. A provider can only add new 
sections. It "
-                        "cannot contribute options to existing sections or 
override other "
-                        "provider's configuration.",
-                        UserWarning,
-                    )
-        self._default_values = 
create_default_config_parser(self.configuration_description)
-        # sensitive_config_values needs to be refreshed here. This is a 
cached_property, so we can delete
-        # the cached values, and it will be refreshed on next access.
-        with contextlib.suppress(AttributeError):
-            # no problem if cache is not set yet
-            del self.sensitive_config_values
-        self._providers_configuration_loaded = True
-
     def _get_config_value_from_secret_backend(self, config_key: str) -> str | 
None:
         """
         Override to use module-level function that reads from global conf.
@@ -702,30 +619,6 @@ def 
create_default_config_parser(configuration_description: dict[str, dict[str,
     return parser
 
 
-def create_provider_config_fallback_defaults() -> ConfigParser:
-    """
-    Create fallback defaults.
-
-    This parser contains provider defaults for Airflow configuration, 
containing fallback default values
-    that might be needed when provider classes are being imported - before 
provider's configuration
-    is loaded.
-
-    Unfortunately airflow currently performs a lot of stuff during importing 
and some of that might lead
-    to retrieving provider configuration before the defaults for the provider 
are loaded.
-
-    Those are only defaults, so if you have "real" values configured in your 
configuration (.cfg file or
-    environment variables) those will be used as usual.
-
-    NOTE!! Do NOT attempt to remove those default fallbacks thinking that they 
are unnecessary duplication,
-    at least not until we fix the way how airflow imports "do stuff". This is 
unlikely to succeed.
-
-    You've been warned!
-    """
-    config_parser = ConfigParser()
-    
config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
-    return config_parser
-
-
 def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
     airflow_config = pathlib.Path(AIRFLOW_CONFIG)
     if airflow_config.is_dir():
@@ -754,11 +647,13 @@ def write_default_airflow_configuration_if_needed() -> 
AirflowConfigParser:
             
conf.configuration_description["core"]["options"]["fernet_key"]["default"] = (
                 _SecretKeys.fernet_key
             )
+            conf._default_values.set("core", "fernet_key", 
_SecretKeys.fernet_key)
 
         _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
         
conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] 
= (
             _SecretKeys.jwt_secret_key
         )
+        conf._default_values.set("api_auth", "jwt_secret", 
_SecretKeys.jwt_secret_key)
         pathlib.Path(airflow_config.__fspath__()).touch()
         make_group_other_inaccessible(airflow_config.__fspath__())
         with open(airflow_config, "w") as file:
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index c64bd166f1b..bda95bbf4b0 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -271,6 +271,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         self.num_runs = num_runs
         self.only_idle = only_idle
         self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
+
+        # Note:
+        # We need to fetch all conf values before the `prohibit_commit` block; 
otherwise the Core conf may
+        # access the MetadataMetastoreBackend and trigger `UNEXPECTED COMMIT - 
THIS WILL BREAK HA LOCKS`.
+        # The easiest way to keep the scheduler loop side-effect free is to 
read those values in `__init__`.
+
         # How many seconds do we wait for tasks to heartbeat before timeout.
         self._task_instance_heartbeat_timeout_secs = conf.getint(
             "scheduler", "task_instance_heartbeat_timeout"
@@ -284,6 +290,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             key="num_stuck_in_queued_retries",
             fallback=2,
         )
+        self._scheduler_use_job_schedule = conf.getboolean("scheduler", 
"use_job_schedule", fallback=True)
+        self._parallelism = conf.getint("core", "parallelism")
+        self._multi_team = conf.getboolean("core", "multi_team")
 
         self.executors: list[BaseExecutor] = executors if executors else 
ExecutorLoader.init_executors()
         self.executor: BaseExecutor = self.executors[0]
@@ -656,7 +665,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             self.log.info("%s tasks up for execution:\n%s", 
len(task_instances_to_examine), task_instance_str)
 
             dag_id_to_team_name: dict[str, str | None] = {}
-            if conf.getboolean("core", "multi_team"):
+            if self._multi_team:
                 # Batch query to resolve team names for all DAG IDs to 
optimize performance
                 # Instead of individual queries in _try_to_load_executor(), 
resolve all team names upfront
                 unique_dag_ids = {ti.dag_id for ti in 
task_instances_to_examine}
@@ -1005,11 +1014,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # we need to make sure in the scheduler now that we don't schedule 
more than core.parallelism totally
         # across all executors.
         num_occupied_slots = sum([executor.slots_occupied for executor in 
self.executors])
-        parallelism = conf.getint("core", "parallelism")
         if self.job.max_tis_per_query == 0:
-            max_tis = parallelism - num_occupied_slots
+            max_tis = self._parallelism - num_occupied_slots
         else:
-            max_tis = min(self.job.max_tis_per_query, parallelism - 
num_occupied_slots)
+            max_tis = min(self.job.max_tis_per_query, self._parallelism - 
num_occupied_slots)
         if max_tis <= 0:
             self.log.debug("max_tis query size is less than or equal to zero. 
No query will be performed!")
             return 0
@@ -1039,7 +1047,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         :param session: The database session
         """
         num_occupied_slots = sum(executor.slots_occupied for executor in 
self.executors)
-        max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
+        max_callbacks = self._parallelism - num_occupied_slots
 
         if max_callbacks <= 0:
             self.log.debug("No available slots for callbacks; all executors at 
capacity")
@@ -1694,7 +1702,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         """
         # Put a check in place to make sure we don't commit unexpectedly
         with prohibit_commit(session) as guard:
-            if conf.getboolean("scheduler", "use_job_schedule", fallback=True):
+            if self._scheduler_use_job_schedule:
                 self._create_dagruns_for_dags(guard, session)
 
             self._start_queued_dagruns(session)
@@ -2874,7 +2882,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     def _purge_task_instances_without_heartbeats(
         self, task_instances_without_heartbeats: list[TI], *, session: Session
     ) -> None:
-        if conf.getboolean("core", "multi_team"):
+        if self._multi_team:
             unique_dag_ids = {ti.dag_id for ti in 
task_instances_without_heartbeats}
             dag_id_to_team_name = 
self._get_team_names_for_dag_ids(unique_dag_ids, session)
         else:
@@ -3122,7 +3130,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     ) -> dict[BaseExecutor, list[SchedulerWorkload]]:
         """Organize workloads into lists per their respective executor."""
         workloads_iter: Iterable[SchedulerWorkload]
-        if conf.getboolean("core", "multi_team"):
+        if self._multi_team:
             if dag_id_to_team_name is None:
                 if isinstance(workloads, list):
                     workloads_list = workloads
@@ -3170,7 +3178,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                          will query the database to resolve team name. None 
indicates global team.
         """
         executor = None
-        if conf.getboolean("core", "multi_team"):
+        if self._multi_team:
             # Use provided team_name if available, otherwise query the database
             if team_name is NOTSET:
                 team_name = self._get_workload_team_name(workload, session)
diff --git a/airflow-core/src/airflow/providers_manager.py 
b/airflow-core/src/airflow/providers_manager.py
index a28f1bd9274..02f5a6957c9 100644
--- a/airflow-core/src/airflow/providers_manager.py
+++ b/airflow-core/src/airflow/providers_manager.py
@@ -610,23 +610,10 @@ class ProvidersManager(LoggingMixin):
 
     @provider_info_cache("config")
     def initialize_providers_configuration(self):
-        """Lazy initialization of providers configuration information."""
-        self._initialize_providers_configuration()
-
-    def _initialize_providers_configuration(self):
-        """
-        Initialize providers configuration information.
-
-        Should be used if we do not want to trigger caching for 
``initialize_providers_configuration`` method.
-        In some cases we might want to make sure that the configuration is 
initialized, but we do not want
-        to cache the initialization method - for example when we just want to 
write configuration with
-        providers, but it is used in the context where no providers are loaded 
yet we will eventually
-        restore the original configuration and we want the subsequent 
``initialize_providers_configuration``
-        method to be run in order to load the configuration for providers 
again.
-        """
+        """Lazy initialization of provider configuration metadata and merge it 
into ``conf``."""
         self.initialize_providers_list()
         self._discover_config()
-        # Now update conf with the new provider configuration from providers
+        # Imported lazily to avoid a configuration/providers_manager import 
cycle during startup.
         from airflow.configuration import conf
 
         conf.load_providers_configuration()
@@ -1491,6 +1478,12 @@ class ProvidersManager(LoggingMixin):
         self._executor_without_check_set.clear()
         self._queue_class_name_set.clear()
         self._provider_configs.clear()
+
+        # Imported lazily to avoid a configuration/providers_manager import 
cycle during cleanup.
+        from airflow.configuration import conf
+
+        conf.invalidate_cache()
+
         self._trigger_info_set.clear()
         self._notification_info_set.clear()
         self._plugins_set.clear()
diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index c9bd4ad8b78..0d40156c45e 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -209,6 +209,11 @@ class TestOtelIntegration:
         # during scheduler handoff (see 
https://github.com/apache/airflow/issues/61070).
         wait_for_otel_collector(otel_host, otel_port)
 
+        # The pytest plugin strips AIRFLOW__*__* env vars (including the JWT 
secret set
+        # by Breeze). Both the scheduler and api-server subprocesses must 
share the same
+        # secret; otherwise each generates its own random key and token 
verification fails.
+        os.environ["AIRFLOW__API_AUTH__JWT_SECRET"] = 
"test-secret-key-for-testing"
+        os.environ["AIRFLOW__API_AUTH__JWT_ISSUER"] = "airflow"
         os.environ["AIRFLOW__TRACES__OTEL_ON"] = "True"
         os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf"
         os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = 
"http://breeze-otel-collector:4318/v1/traces";
diff --git a/airflow-core/tests/unit/core/test_configuration.py 
b/airflow-core/tests/unit/core/test_configuration.py
index be0f191f8c9..9ee2f737533 100644
--- a/airflow-core/tests/unit/core/test_configuration.py
+++ b/airflow-core/tests/unit/core/test_configuration.py
@@ -64,6 +64,9 @@ conf.deprecated_options[("scheduler", 
"parsing_cleanup_interval")] = (
     "deactivate_stale_dags_interval",
     "2.5.0",
 )
+# Invalidate cached properties that depend on deprecated_options, since they 
may have been
+# computed during airflow initialization before the entries above were added.
+conf.invalidate_cache()
 
 
 @pytest.fixture(scope="module", autouse=True)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 997ebd24bcf..f9ae60a9c35 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2779,7 +2779,10 @@ class TestSchedulerJob:
             task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
 
         scheduler_job = Job()
-        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+        with conf_vars({("core", "parallelism"): "40"}):
+            # 40 dag runs * 2 tasks each = 80. Two executors have capacity for 
61 concurrent jobs, but they
+            # together respect core.parallelism and will not run more in 
aggregate then that allows.
+            self.job_runner = SchedulerJobRunner(job=scheduler_job)
 
         def _create_dagruns():
             dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.RUNNING)
@@ -2804,10 +2807,7 @@ class TestSchedulerJob:
             executor.slots_available = 31
 
         total_enqueued = 0
-        with conf_vars({("core", "parallelism"): "40"}):
-            # 40 dag runs * 2 tasks each = 80. Two executors have capacity for 
61 concurrent jobs, but they
-            # together respect core.parallelism and will not run more in 
aggregate then that allows.
-            total_enqueued += 
self.job_runner._critical_section_enqueue_task_instances(session)
+        total_enqueued += 
self.job_runner._critical_section_enqueue_task_instances(session)
 
         if task1_exec != task2_exec:
             # Two executors will execute up to core parallelism
diff --git a/scripts/ci/prek/check_airflow_imports_in_shared.py 
b/scripts/ci/prek/check_airflow_imports_in_shared.py
index 63dd94e090f..ece603ec11a 100755
--- a/scripts/ci/prek/check_airflow_imports_in_shared.py
+++ b/scripts/ci/prek/check_airflow_imports_in_shared.py
@@ -32,6 +32,26 @@ from pathlib import Path
 from common_prek_utils import console
 
 
+def _is_type_checking_guard(node: ast.If) -> bool:
+    """Check if an If node is a ``TYPE_CHECKING`` guard."""
+    test = node.test
+    if isinstance(test, ast.Name) and test.id == "TYPE_CHECKING":
+        return True
+    if isinstance(test, ast.Attribute) and test.attr == "TYPE_CHECKING":
+        return True
+    return False
+
+
+def _collect_type_checking_node_ids(tree: ast.AST) -> set[int]:
+    """Return the ``id()`` of every AST node nested inside an ``if 
TYPE_CHECKING`` block."""
+    guarded: set[int] = set()
+    for node in ast.walk(tree):
+        if isinstance(node, ast.If) and _is_type_checking_guard(node):
+            for child in ast.walk(node):
+                guarded.add(id(child))
+    return guarded
+
+
 def check_file_for_prohibited_imports(file_path: Path) -> list[tuple[int, 
str]]:
     try:
         source = file_path.read_text(encoding="utf-8")
@@ -39,9 +59,12 @@ def check_file_for_prohibited_imports(file_path: Path) -> 
list[tuple[int, str]]:
     except (OSError, UnicodeDecodeError, SyntaxError):
         return []
 
+    type_checking_ids = _collect_type_checking_node_ids(tree)
     violations = []
 
     for node in ast.walk(tree):
+        if id(node) in type_checking_ids:
+            continue
         # Check `from airflow.x import y` statements
         if isinstance(node, ast.ImportFrom):
             if node.module and node.module.startswith("airflow."):
diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py 
b/shared/configuration/src/airflow_shared/configuration/parser.py
index b06f9611a98..55f24c529e0 100644
--- a/shared/configuration/src/airflow_shared/configuration/parser.py
+++ b/shared/configuration/src/airflow_shared/configuration/parser.py
@@ -33,10 +33,11 @@ import warnings
 from collections.abc import Callable, Generator, Iterable
 from configparser import ConfigParser, NoOptionError, NoSectionError
 from contextlib import contextmanager
+from copy import deepcopy
 from enum import Enum
 from json.decoder import JSONDecodeError
 from re import Pattern
-from typing import IO, Any, TypeVar, overload
+from typing import IO, TYPE_CHECKING, Any, TypeVar, overload
 
 from .exceptions import AirflowConfigException
 
@@ -78,6 +79,11 @@ ConfigSourcesType = dict[str, ConfigSectionSourcesType]
 ENV_VAR_PREFIX = "AIRFLOW__"
 
 
+if TYPE_CHECKING:
+    from airflow.providers_manager import ProvidersManager
+    from airflow.sdk.providers_manager_runtime import 
ProvidersManagerTaskRuntime
+
+
 class ValueNotFound:
     """Object of this is raised when a configuration value cannot be found."""
 
@@ -166,6 +172,34 @@ def configure_parser_from_configuration_description(
                         parser.set(section, key, default_value)
 
 
+def create_provider_cfg_config_fallback_defaults(
+    provider_config_fallback_defaults_cfg_path: str,
+) -> ConfigParser:
+    """
+    Create fallback defaults for configuration.
+
+    This parser contains provider defaults for Airflow configuration, 
containing fallback default values
+    that might be needed when provider classes are being imported - before 
provider's configuration
+    is loaded.
+
+    Unfortunately airflow currently performs a lot of stuff during importing 
and some of that might lead
+    to retrieving provider configuration before the defaults for the provider 
are loaded.
+
+    Those are only defaults, so if you have "real" values configured in your 
configuration (.cfg file or
+    environment variables) those will be used as usual.
+
+    NOTE!! Do NOT attempt to remove those default fallbacks thinking that they 
are unnecessary duplication,
+    at least not until we fix the way how airflow imports "do stuff". This is 
unlikely to succeed.
+
+    You've been warned!
+
+    :param provider_config_fallback_defaults_cfg_path: path to the provider 
config fallback defaults .cfg file
+    """
+    config_parser = ConfigParser()
+    config_parser.read(provider_config_fallback_defaults_cfg_path)
+    return config_parser
+
+
 class AirflowConfigParser(ConfigParser):
     """
     Base configuration parser with pure parsing logic.
@@ -241,7 +275,85 @@ class AirflowConfigParser(ConfigParser):
             self._get_option_from_commands,
             self._get_option_from_secrets,
             self._get_option_from_defaults,
+            self._get_option_from_provider_cfg_config_fallbacks,
+            self._get_option_from_provider_metadata_config_fallbacks,
+        ]
+
+    def _get_config_sources_for_as_dict(self) -> list[tuple[str, 
ConfigParser]]:
+        """Override the base method to add provider fallbacks when providers 
are loaded."""
+        sources: list[tuple[str, ConfigParser]] = [
+            ("default", self._default_values),
+            ("airflow.cfg", self),
         ]
+        if self._providers_configuration_loaded:
+            sources.insert(
+                0,
+                (
+                    "provider-metadata-fallback-defaults",
+                    self._provider_metadata_config_fallback_default_values,
+                ),
+            )
+            sources.insert(
+                0,
+                ("provider-cfg-fallback-defaults", 
self._provider_cfg_config_fallback_default_values),
+            )
+        return sources
+
+    def _get_option_from_provider_cfg_config_fallbacks(
+        self,
+        deprecated_key: str | None,
+        deprecated_section: str | None,
+        key: str,
+        section: str,
+        issue_warning: bool = True,
+        extra_stacklevel: int = 0,
+        **kwargs,
+    ) -> str | ValueNotFound:
+        """Get config option from provider fallback defaults."""
+        value = self.get_from_provider_cfg_config_fallback_defaults(section, 
key, **kwargs)
+        if value is not None:
+            return value
+        return VALUE_NOT_FOUND_SENTINEL
+
+    def _get_option_from_provider_metadata_config_fallbacks(
+        self,
+        deprecated_key: str | None,
+        deprecated_section: str | None,
+        key: str,
+        section: str,
+        issue_warning: bool = True,
+        extra_stacklevel: int = 0,
+        **kwargs,
+    ) -> str | ValueNotFound:
+        """Get config option from provider metadata fallback defaults."""
+        value = 
self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs)
+        if value is not None:
+            return value
+        return VALUE_NOT_FOUND_SENTINEL
+
+    def get_from_provider_cfg_config_fallback_defaults(self, section: str, 
key: str, **kwargs) -> Any:
+        """Get provider config fallback default values."""
+        raw = kwargs.get("raw", False)
+        vars_ = kwargs.get("vars")
+        return self._provider_cfg_config_fallback_default_values.get(
+            section, key, fallback=None, raw=raw, vars=vars_
+        )
+
+    @functools.cached_property
+    def _provider_metadata_config_fallback_default_values(self) -> 
ConfigParser:
+        """Return Provider metadata config fallback default values."""
+        base_configuration_description: dict[str, dict[str, Any]] = {}
+        for _, config in self._provider_manager_type().provider_configs:
+            base_configuration_description.update(config)
+        return 
self._create_default_config_parser_callable(base_configuration_description)
+
+    def get_from_provider_metadata_config_fallback_defaults(self, section: 
str, key: str, **kwargs) -> Any:
+        """Get provider metadata config fallback default values."""
+        raw = kwargs.get("raw", False)
+        vars_ = kwargs.get("vars")
+        return self._provider_metadata_config_fallback_default_values.get(
+            section, key, fallback=None, raw=raw, vars=vars_
+        )
 
     @property
     def _validators(self) -> list[Callable[[], None]]:
@@ -300,6 +412,9 @@ class AirflowConfigParser(ConfigParser):
         self,
         configuration_description: dict[str, dict[str, Any]],
         _default_values: ConfigParser,
+        provider_manager_type: type[ProvidersManager] | 
type[ProvidersManagerTaskRuntime],
+        create_default_config_parser_callable: Callable[[dict[str, dict[str, 
Any]]], ConfigParser],
+        provider_config_fallback_defaults_cfg_path: str,
         *args,
         **kwargs,
     ):
@@ -308,12 +423,36 @@ class AirflowConfigParser(ConfigParser):
 
         :param configuration_description: Description of configuration options
         :param _default_values: ConfigParser with default values
+        :param provider_manager_type: Either ProvidersManager or 
ProvidersManagerTaskRuntime, depending on the context of the caller.
+        :param create_default_config_parser_callable: The 
`create_default_config_parser` function from core or SDK, depending on the 
context of the caller.
+        :param provider_config_fallback_defaults_cfg_path: Path to the 
`provider_config_fallback_defaults.cfg` file.
         """
         super().__init__(*args, **kwargs)
         self.configuration_description = configuration_description
+        self._base_configuration_description = 
deepcopy(configuration_description)
         self._default_values = _default_values
+        self._provider_manager_type = provider_manager_type
+        self._create_default_config_parser_callable = 
create_default_config_parser_callable
+        self._provider_cfg_config_fallback_default_values = 
create_provider_cfg_config_fallback_defaults(
+            provider_config_fallback_defaults_cfg_path
+        )
         self._suppress_future_warnings = False
         self.upgraded_values: dict[tuple[str, str], str] = {}
+        self._providers_configuration_loaded = False
+
+    def invalidate_cache(self) -> None:
+        """
+        Clear all ``functools.cached_property`` entries on this instance.
+
+        Call this after mutating class-level attributes (e.g. 
``deprecated_options``)
+        so that derived cached properties are recomputed on next access.
+        """
+        for attr_name in (
+            name
+            for name in dir(type(self))
+            if isinstance(getattr(type(self), name, None), 
functools.cached_property)
+        ):
+            self.__dict__.pop(attr_name, None)
 
     @functools.cached_property
     def inversed_deprecated_options(self):
@@ -1046,6 +1185,56 @@ class AirflowConfigParser(ConfigParser):
 
         return section, key, deprecated_section, deprecated_key, 
warning_emitted
 
+    def load_providers_configuration(self) -> None:
+        """
+        Load configuration for providers.
+
+        This should be done after initial configuration have been performed. 
Initializing and discovering
+        providers is an expensive operation and cannot be performed when we 
load configuration for the first
+        time when airflow starts, because we initialize configuration very 
early, during importing of the
+        `airflow` package and the module is not yet ready to be used when it 
happens and until configuration
+        and settings are loaded. Therefore, in order to reload provider 
configuration we need to additionally
+        load provider - specific configuration.
+        """
+        log.debug("Loading providers configuration")
+
+        self.restore_core_default_configuration()
+        for provider, config in 
self._provider_manager_type().already_initialized_provider_configs:
+            for provider_section, provider_section_content in config.items():
+                provider_options = provider_section_content["options"]
+                section_in_current_config = 
self.configuration_description.get(provider_section)
+                if not section_in_current_config:
+                    self.configuration_description[provider_section] = 
deepcopy(provider_section_content)
+                    section_in_current_config = 
self.configuration_description.get(provider_section)
+                    section_in_current_config["source"] = f"default-{provider}"
+                    for option in provider_options:
+                        section_in_current_config["options"][option]["source"] 
= f"default-{provider}"
+                else:
+                    section_source = section_in_current_config.get("source", 
"Airflow's core package").split(
+                        "default-"
+                    )[-1]
+                    raise AirflowConfigException(
+                        f"The provider {provider} is attempting to contribute "
+                        f"configuration section {provider_section} that "
+                        f"has already been added before. The source of it: 
{section_source}. "
+                        "This is forbidden. A provider can only add new 
sections. It "
+                        "cannot contribute options to existing sections or 
override other "
+                        "provider's configuration.",
+                        UserWarning,
+                    )
+        self._default_values = 
self._create_default_config_parser_callable(self.configuration_description)
+        # Cached properties derived from configuration_description (e.g. 
sensitive_config_values) need
+        # to be recomputed now that provider config has been merged in.
+        self.invalidate_cache()
+        self._providers_configuration_loaded = True
+
+    def restore_core_default_configuration(self) -> None:
+        """Restore the parser state before provider-contributed sections were 
loaded."""
+        self.configuration_description = 
deepcopy(self._base_configuration_description)
+        self._default_values = 
self._create_default_config_parser_callable(self.configuration_description)
+        self.invalidate_cache()
+        self._providers_configuration_loaded = False
+
     @overload  # type: ignore[override]
     def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> 
str: ...
 
@@ -1428,17 +1617,6 @@ class AirflowConfigParser(ConfigParser):
         """
         return optionstr
 
-    def _get_config_sources_for_as_dict(self) -> list[tuple[str, 
ConfigParser]]:
-        """
-        Get list of config sources to use in as_dict().
-
-        Subclasses can override to add additional sources (e.g., provider 
configs).
-        """
-        return [
-            ("default", self._default_values),
-            ("airflow.cfg", self),
-        ]
-
     def as_dict(
         self,
         display_source: bool = False,
@@ -1741,10 +1919,10 @@ class AirflowConfigParser(ConfigParser):
         :param extra_spacing: Add extra spacing before examples and after 
variables
         :param only_defaults: Only include default values when writing the 
config, not the actual values
         """
-        sources_dict = {}
-        if include_sources:
-            sources_dict = self.as_dict(display_source=True)
         with 
self.make_sure_configuration_loaded(with_providers=include_providers):
+            sources_dict = {}
+            if include_sources:
+                sources_dict = self.as_dict(display_source=True)
             for section_to_write in self.get_sections_including_defaults():
                 section_config_description = 
self.configuration_description.get(section_to_write, {})
                 if section_to_write != section and section is not None:
diff --git a/shared/configuration/tests/configuration/test_parser.py 
b/shared/configuration/tests/configuration/test_parser.py
index a018a931ed5..45b8ee1fd57 100644
--- a/shared/configuration/tests/configuration/test_parser.py
+++ b/shared/configuration/tests/configuration/test_parser.py
@@ -26,6 +26,7 @@ import re
 import textwrap
 from configparser import ConfigParser
 from enum import Enum
+from io import StringIO
 from unittest.mock import patch
 
 import pytest
@@ -37,10 +38,39 @@ from airflow_shared.configuration.parser import (
 )
 
 
+class _NoOpProvidersManager:
+    """Stub providers manager for tests — no providers, no side effects."""
+
+    @property
+    def provider_configs(self):
+        return []
+
+    @property
+    def already_initialized_provider_configs(self):
+        return []
+
+
+def _create_empty_config_parser(desc: dict) -> ConfigParser:
+    return ConfigParser()
+
+
+def _create_default_config_parser(desc: dict) -> ConfigParser:
+    parser = ConfigParser()
+    configure_parser_from_configuration_description(parser, desc, {})
+    return parser
+
+
 class AirflowConfigParser(_SharedAirflowConfigParser):
     """Test parser that extends shared parser for testing."""
 
-    def __init__(self, default_config: str | None = None, *args, **kwargs):
+    def __init__(
+        self,
+        default_config: str | None = None,
+        provider_manager_type=_NoOpProvidersManager,
+        create_default_config_parser_callable=_create_empty_config_parser,
+        *args,
+        **kwargs,
+    ):
         configuration_description = {
             "test": {
                 "options": {
@@ -53,7 +83,15 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
         _default_values.add_section("test")
         _default_values.set("test", "key1", "default_value")
         _default_values.set("test", "key2", "123")
-        super().__init__(configuration_description, _default_values, *args, 
**kwargs)
+        super().__init__(
+            configuration_description,
+            _default_values,
+            provider_manager_type,
+            create_default_config_parser_callable,
+            "",
+            *args,
+            **kwargs,
+        )
         self.configuration_description = configuration_description
         self._default_values = _default_values
         self._suppress_future_warnings = False
@@ -71,6 +109,22 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
             for key, value in parser.items(section):
                 self._default_values.set(section, key, value)
 
+    def _ensure_providers_config_loaded(self) -> None:
+        """Load provider configuration for tests when requested."""
+        if not self._providers_configuration_loaded:
+            self.load_providers_configuration()
+
+    def _ensure_providers_config_unloaded(self) -> bool:
+        """Unload provider configuration for tests when requested."""
+        if self._providers_configuration_loaded:
+            self.restore_core_default_configuration()
+            return True
+        return False
+
+    def _reload_provider_configs(self) -> None:
+        """Reload provider configuration for tests after temporary unloads."""
+        self.load_providers_configuration()
+
 
 class TestAirflowConfigParser:
     """Test the shared AirflowConfigParser parser methods."""
@@ -782,6 +836,61 @@ existing_list = one,two,three
         with pytest.raises(ValueError, match=r"The value test/missing_key 
should be set!"):
             test_conf.get_mandatory_list_value("test", "missing_key", 
fallback=None)
 
+    def 
test_as_dict_only_materializes_provider_sources_after_loading_providers(self):
+        test_conf = AirflowConfigParser()
+
+        test_conf.as_dict(display_source=True)
+        assert "_provider_metadata_config_fallback_default_values" not in 
test_conf.__dict__
+
+        test_conf.load_providers_configuration()
+        test_conf.as_dict(display_source=True)
+        assert "_provider_metadata_config_fallback_default_values" in 
test_conf.__dict__
+
+    def test_write_materializes_provider_sources_in_requested_context(self):
+        test_conf = AirflowConfigParser()
+
+        test_conf.write(StringIO(), include_sources=True, 
include_providers=False)
+        assert "_provider_metadata_config_fallback_default_values" not in 
test_conf.__dict__
+
+        test_conf.write(StringIO(), include_sources=True, 
include_providers=True)
+        assert "_provider_metadata_config_fallback_default_values" in 
test_conf.__dict__
+
+    def 
test_get_uses_provider_metadata_fallback_before_loading_providers(self):
+        provider_configs = [
+            (
+                "apache-airflow-providers-test",
+                {
+                    "test_provider": {
+                        "options": {
+                            "test_option": {
+                                "default": "provider-default",
+                            }
+                        }
+                    }
+                },
+            )
+        ]
+
+        class ProvidersManagerWithConfig:
+            @property
+            def provider_configs(self):
+                return provider_configs
+
+            @property
+            def already_initialized_provider_configs(self):
+                return []
+
+        test_conf = AirflowConfigParser(
+            provider_manager_type=ProvidersManagerWithConfig,
+            
create_default_config_parser_callable=_create_default_config_parser,
+        )
+
+        assert test_conf._providers_configuration_loaded is False
+        assert test_conf.configuration_description.get("test_provider") is None
+        assert test_conf.get("test_provider", "test_option") == 
"provider-default"
+        assert test_conf._providers_configuration_loaded is False
+        assert test_conf.configuration_description.get("test_provider") is None
+
     def test_set_case_insensitive(self):
         # both get and set should be case insensitive
         test_conf = AirflowConfigParser()
@@ -861,7 +970,14 @@ existing_list = one,two,three
                 configure_parser_from_configuration_description(
                     _default_values, configuration_description, {}
                 )
-                _SharedAirflowConfigParser.__init__(self, 
configuration_description, _default_values)
+                _SharedAirflowConfigParser.__init__(
+                    self,
+                    configuration_description,
+                    _default_values,
+                    _NoOpProvidersManager,
+                    _create_empty_config_parser,
+                    "",
+                )
 
         test_conf = TestConfigParser()
         deprecated_conf_list = [
diff --git a/task-sdk/src/airflow/sdk/configuration.py 
b/task-sdk/src/airflow/sdk/configuration.py
index 30e073d9ee3..64bda4b3a56 100644
--- a/task-sdk/src/airflow/sdk/configuration.py
+++ b/task-sdk/src/airflow/sdk/configuration.py
@@ -125,11 +125,23 @@ class AirflowSDKConfigParser(_SharedAirflowConfigParser):
         *args,
         **kwargs,
     ):
+        # Imported lazily to preserve the module-level lazy ``conf`` 
initialization and avoid a
+        # configuration/providers_manager_runtime import cycle.
+        from airflow.sdk.providers_manager_runtime import 
ProvidersManagerTaskRuntime
+
         # Read Core's config.yml (Phase 1: shared config.yml)
         configuration_description = retrieve_configuration_description()
         # Create default values parser
         _default_values = 
create_default_config_parser(configuration_description)
-        super().__init__(configuration_description, _default_values, *args, 
**kwargs)
+        super().__init__(
+            configuration_description,
+            _default_values,
+            ProvidersManagerTaskRuntime,
+            create_default_config_parser,
+            _default_config_file_path("provider_config_fallback_defaults.cfg"),
+            *args,
+            **kwargs,
+        )
         self.configuration_description = configuration_description
         self._default_values = _default_values
         self._suppress_future_warnings = False
@@ -145,6 +157,11 @@ class AirflowSDKConfigParser(_SharedAirflowConfigParser):
         if default_config is not None:
             self._update_defaults_from_string(default_config)
 
+    def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> 
Any | None:
+        return super()._get_custom_secret_backend(
+            worker_mode=worker_mode if worker_mode is not None else True
+        )
+
     def expand_all_configuration_values(self):
         """Expand all configuration values using SDK-specific expansion 
variables."""
         all_vars = get_sdk_expansion_variables()
diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py 
b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
index e7b1b65e2bf..4d764596814 100644
--- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py
+++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py
@@ -152,6 +152,8 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         self._plugins_set: set[PluginInfo] = set()
         self._provider_schema_validator = 
_create_provider_info_schema_validator()
         self._init_airflow_core_hooks()
+        # Populated by initialize_provider_configs(); holds 
provider-contributed config sections.
+        self._provider_configs: dict[str, dict[str, Any]] = {}
 
     def _init_airflow_core_hooks(self):
         """Initialize the hooks dict with default hooks from Airflow core."""
@@ -218,6 +220,22 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         self.initialize_providers_list()
         self._discover_taskflow_decorators()
 
+    @provider_info_cache("provider_configs")
+    def initialize_provider_configs(self):
+        """Lazy initialization of provider configuration metadata and merge it 
into SDK ``conf``."""
+        self.initialize_providers_list()
+        self._discover_config()
+        # Imported lazily to preserve SDK conf lazy initialization and avoid a 
configuration/runtime cycle.
+        from airflow.sdk.configuration import conf
+
+        conf.load_providers_configuration()
+
+    def _discover_config(self) -> None:
+        """Retrieve all configs defined in the providers."""
+        for provider_package, provider in self._provider_dict.items():
+            if config := provider.data.get("config"):
+                self._provider_configs[provider_package] = config
+
     def _discover_hooks_from_connection_types(
         self,
         hook_class_names_registered: set[str],
@@ -597,6 +615,15 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         self.initialize_providers_plugins()
         return sorted(self._plugins_set, key=lambda x: x.plugin_class)
 
+    @property
+    def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
+        self.initialize_provider_configs()
+        return sorted(self._provider_configs.items(), key=lambda x: x[0])
+
+    @property
+    def already_initialized_provider_configs(self) -> list[tuple[str, 
dict[str, Any]]]:
+        return sorted(self._provider_configs.items(), key=lambda x: x[0])
+
     def _cleanup(self):
         self._initialized_cache.clear()
         self._provider_dict.clear()
@@ -608,6 +635,12 @@ class ProvidersManagerTaskRuntime(LoggingMixin):
         self._asset_uri_handlers.clear()
         self._asset_factories.clear()
         self._asset_to_openlineage_converters.clear()
+        self._provider_configs.clear()
+
+        # Imported lazily to preserve SDK conf lazy initialization and avoid a 
configuration/runtime cycle.
+        from airflow.sdk.configuration import conf
+
+        conf.invalidate_cache()
 
         self._initialized = False
         self._initialization_stack_trace = None
diff --git a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py 
b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
index da6600a6fda..aee5a115363 100644
--- a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
+++ b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py
@@ -236,3 +236,38 @@ class TestProvidersManagerRuntime:
             assert self._caplog.messages == [
                 "Optional provider feature disabled when importing 'HookClass' 
from 'test_package' package"
             ]
+
+    def test_initialize_provider_configs_can_reload_sdk_conf(self):
+        from airflow.sdk.configuration import conf
+
+        providers_manager = ProvidersManagerTaskRuntime()
+        provider_config = {
+            "test_sdk_provider": {
+                "description": "Provider config used in runtime tests.",
+                "options": {
+                    "test_option": {
+                        "default": "provider-default",
+                    }
+                },
+            }
+        }
+
+        def initialize_provider_configs() -> None:
+            
providers_manager._provider_dict["apache-airflow-providers-test-sdk"] = 
ProviderInfo(
+                version="0.0.1",
+                data={"config": provider_config},
+            )
+            with patch.object(providers_manager, "initialize_providers_list"):
+                providers_manager.initialize_provider_configs()
+
+        conf.restore_core_default_configuration()
+        try:
+            initialize_provider_configs()
+            assert conf.get("test_sdk_provider", "test_option") == 
"provider-default"
+
+            providers_manager._cleanup()
+
+            initialize_provider_configs()
+            assert conf.get("test_sdk_provider", "test_option") == 
"provider-default"
+        finally:
+            conf.restore_core_default_configuration()

Reply via email to