This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 56c41d460c Introduce decorator to load providers configuration (#32765)
56c41d460c is described below

commit 56c41d460c3f2a4e871c7834033c3152e71f71d2
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Jul 22 21:38:58 2023 +0200

    Introduce decorator to load providers configuration (#32765)
    
    A number of commands in Airflow relies on the fact that
    providers configuration is loaded. This is a rather fast operation
    as it does not involve any importing of provider classes, just
    discovering entrypoints, running them and parsing yaml configuration,
    so it is a very low sub-second time to do it.
    
    We cannot do it once in settings/config because we actually need
    settings/config to be pre-initialized without providers in order
    to be able to bootstrap airflow, therefore we need to run
    it individually in each command that can be run with the
    "airflow" entrypoint. Decorator seems to be best suited to do
    the job:
    
    * easy to apply and not easy to forget when you create another
      command and look at other commands
    * nicely wraps around local ProvidersManager import
    
    There are exceptions for the "version" and "providers lazy-loaded"
    commands because they are NOT supposed to initialize configuration
    of providers.
---
 airflow/__main__.py                                | 11 +++--
 airflow/cli/cli_config.py                          |  6 +--
 airflow/cli/commands/celery_command.py             |  5 ++
 airflow/cli/commands/config_command.py             |  6 +--
 airflow/cli/commands/connection_command.py         |  8 ++++
 airflow/cli/commands/dag_command.py                | 23 ++++++++-
 airflow/cli/commands/dag_processor_command.py      |  2 +
 airflow/cli/commands/db_command.py                 | 11 +++++
 airflow/cli/commands/info_command.py               |  2 +
 airflow/cli/commands/internal_api_command.py       |  2 +
 airflow/cli/commands/jobs_command.py               |  2 +
 airflow/cli/commands/kerberos_command.py           |  2 +
 airflow/cli/commands/kubernetes_command.py         |  3 ++
 airflow/cli/commands/plugins_command.py            |  2 +
 airflow/cli/commands/pool_command.py               |  7 +++
 airflow/cli/commands/provider_command.py           | 17 ++++++-
 airflow/cli/commands/role_command.py               |  7 +++
 airflow/cli/commands/rotate_fernet_key_command.py  |  2 +
 airflow/cli/commands/scheduler_command.py          |  2 +
 airflow/cli/commands/standalone_command.py         |  2 +
 airflow/cli/commands/sync_perm_command.py          |  2 +
 airflow/cli/commands/task_command.py               |  7 +++
 airflow/cli/commands/triggerer_command.py          |  2 +
 airflow/cli/commands/user_command.py               |  7 +++
 airflow/cli/commands/variable_command.py           |  7 +++
 airflow/cli/commands/webserver_command.py          |  2 +
 .../celery/executors/celery_executor_utils.py      | 22 ++++++++-
 .../celery/executors/celery_kubernetes_executor.py |  7 +--
 .../providers/celery/executors/default_celery.py   | 24 +++++++---
 airflow/utils/providers_configuration_loader.py    | 55 ++++++++++++++++++++++
 tests/cli/test_cli_parser.py                       |  4 +-
 .../integration/executors/test_celery_executor.py  | 16 ++++++-
 .../celery/executors/test_celery_executor.py       |  8 ++--
 33 files changed, 250 insertions(+), 35 deletions(-)

diff --git a/airflow/__main__.py b/airflow/__main__.py
index 893c6bde09..c15f75b4b9 100644
--- a/airflow/__main__.py
+++ b/airflow/__main__.py
@@ -46,12 +46,13 @@ def main():
     argcomplete.autocomplete(parser)
     args = parser.parse_args()
 
-    # Here we ensure that the default configuration is written if needed 
before running any command
-    # that might need it. This used to be done during configuration 
initialization but having it
-    # in main ensures that it is not done during tests and other ways airflow 
imports are used
-    from airflow.configuration import 
write_default_airflow_configuration_if_needed
+    if args.subcommand not in ["lazy_loaded", "version"]:
+        # Here we ensure that the default configuration is written if needed 
before running any command
+        # that might need it. This used to be done during configuration 
initialization but having it
+        # in main ensures that it is not done during tests and other ways 
airflow imports are used
+        from airflow.configuration import 
write_default_airflow_configuration_if_needed
 
-    write_default_airflow_configuration_if_needed()
+        write_default_airflow_configuration_if_needed()
     args.func(args)
 
 
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index a57a8dae1d..30e7130283 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -1872,9 +1872,9 @@ PROVIDERS_COMMANDS = (
         args=(ARG_OUTPUT, ARG_VERBOSE),
     ),
     ActionCommand(
-        name="status",
-        help="Get information about provider initialization status",
-        func=lazy_load_command("airflow.cli.commands.provider_command.status"),
+        name="lazy-loaded",
+        help="Checks that provider configuration is lazy loaded",
+        
func=lazy_load_command("airflow.cli.commands.provider_command.lazy_loaded"),
         args=(ARG_VERBOSE,),
     ),
 )
diff --git a/airflow/cli/commands/celery_command.py 
b/airflow/cli/commands/celery_command.py
index 7adbea36e6..af7dcbc607 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -36,12 +36,14 @@ from airflow import settings
 from airflow.configuration import conf
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations, setup_logging
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.serve_logs import serve_logs
 
 WORKER_PROCESS_NAME = "worker"
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def flower(args):
     """Starts Flower, Celery monitoring tool."""
     # This needs to be imported locally to not trigger Providers Manager 
initialization
@@ -103,6 +105,7 @@ def _serve_logs(skip_serve_logs: bool = False):
 
 
 @after_setup_logger.connect()
+@providers_configuration_loaded
 def logger_setup_handler(logger, **kwargs):
     """
     Reconfigure the logger.
@@ -132,6 +135,7 @@ def logger_setup_handler(logger, **kwargs):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def worker(args):
     """Starts Airflow Celery worker."""
     # This needs to be imported locally to not trigger Providers Manager 
initialization
@@ -239,6 +243,7 @@ def worker(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def stop_worker(args):
     """Sends SIGTERM to Celery worker."""
     # Read PID from file
diff --git a/airflow/cli/commands/config_command.py 
b/airflow/cli/commands/config_command.py
index 86130dab10..4855c7dd9f 100644
--- a/airflow/cli/commands/config_command.py
+++ b/airflow/cli/commands/config_command.py
@@ -25,8 +25,10 @@ from pygments.lexers.configs import IniLexer
 from airflow.configuration import conf
 from airflow.utils.cli import should_use_colors
 from airflow.utils.code_utils import get_terminal_formatter
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
+@providers_configuration_loaded
 def show_config(args):
     """Show current application configuration."""
     with io.StringIO() as output:
@@ -47,6 +49,7 @@ def show_config(args):
         print(code)
 
 
+@providers_configuration_loaded
 def get_value(args):
     """Get one value from configuration."""
     # while this will make get_value quite a bit slower we must initialize 
configuration
@@ -54,9 +57,6 @@ def get_value(args):
     # providers are initialized. Theoretically Providers might add new 
sections and options
     # but also override defaults for existing options, so without loading all 
providers we
     # cannot be sure what is the final value of the option.
-    from airflow.providers_manager import ProvidersManager
-
-    ProvidersManager().initialize_providers_configuration()
     if not conf.has_option(args.section, args.option):
         raise SystemExit(f"The option [{args.section}/{args.option}] is not 
found in config.")
 
diff --git a/airflow/cli/commands/connection_command.py 
b/airflow/cli/commands/connection_command.py
index e7b83e342e..5db123f004 100644
--- a/airflow/cli/commands/connection_command.py
+++ b/airflow/cli/commands/connection_command.py
@@ -39,6 +39,7 @@ from airflow.providers_manager import ProvidersManager
 from airflow.secrets.local_filesystem import load_connections_dict
 from airflow.utils import cli as cli_utils, helpers, yaml
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import create_session
 
 
@@ -61,6 +62,7 @@ def _connection_mapper(conn: Connection) -> dict[str, Any]:
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def connections_get(args):
     """Get a connection."""
     try:
@@ -75,6 +77,7 @@ def connections_get(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def connections_list(args):
     """Lists all connections at the command line."""
     with create_session() as session:
@@ -150,6 +153,7 @@ def _get_connection_types() -> list[str]:
     return _connection_types
 
 
+@providers_configuration_loaded
 def connections_export(args):
     """Exports all connections to a file."""
     file_formats = [".yaml", ".json", ".env"]
@@ -200,6 +204,7 @@ alternative_conn_specs = ["conn_type", "conn_host", 
"conn_login", "conn_password
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def connections_add(args):
     """Adds new connection."""
     has_uri = bool(args.conn_uri)
@@ -291,6 +296,7 @@ def connections_add(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def connections_delete(args):
     """Deletes connection from DB."""
     with create_session() as session:
@@ -306,6 +312,7 @@ def connections_delete(args):
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def connections_import(args):
     """Imports connections from a file."""
     if os.path.exists(args.file):
@@ -343,6 +350,7 @@ def _import_helper(file_path: str, overwrite: bool) -> None:
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def connections_test(args) -> None:
     """Test an Airflow connection."""
     console = AirflowConsole()
diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 66decad78b..f965e24b5e 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -45,6 +45,7 @@ from airflow.timetables.base import DataInterval
 from airflow.utils import cli as cli_utils, timezone
 from airflow.utils.cli import get_dag, get_dags, process_subdir, 
sigint_handler, suppress_logs_and_warning
 from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState
 
@@ -120,6 +121,7 @@ def _run_dag_backfill(dags: list[DAG], args) -> None:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None:
     """Creates backfill job or dry run for a DAG or list of DAGs using 
regex."""
     logging.basicConfig(level=settings.LOGGING_LEVEL, 
format=settings.SIMPLE_LOG_FORMAT)
@@ -150,6 +152,7 @@ def dag_backfill(args, dag: list[DAG] | DAG | None = None) 
-> None:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def dag_trigger(args) -> None:
     """Creates a dag run for the specified dag."""
     api_client = get_current_api_client()
@@ -170,6 +173,7 @@ def dag_trigger(args) -> None:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def dag_delete(args) -> None:
     """Deletes all DB records related to the specified dag."""
     api_client = get_current_api_client()
@@ -188,17 +192,20 @@ def dag_delete(args) -> None:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def dag_pause(args) -> None:
     """Pauses a DAG."""
     set_is_paused(True, args)
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def dag_unpause(args) -> None:
     """Unpauses a DAG."""
     set_is_paused(False, args)
 
 
+@providers_configuration_loaded
 def set_is_paused(is_paused: bool, args) -> None:
     """Sets is_paused for DAG by a given dag_id."""
     dag = DagModel.get_dagmodel(args.dag_id)
@@ -211,6 +218,7 @@ def set_is_paused(is_paused: bool, args) -> None:
     print(f"Dag: {args.dag_id}, paused: {is_paused}")
 
 
+@providers_configuration_loaded
 def dag_dependencies_show(args) -> None:
     """Displays DAG dependencies, save to file or show as imgcat image."""
     dot = render_dag_dependencies(SerializedDagModel.get_dag_dependencies())
@@ -230,6 +238,7 @@ def dag_dependencies_show(args) -> None:
         print(dot.source)
 
 
+@providers_configuration_loaded
 def dag_show(args) -> None:
     """Displays DAG or saves it's graphic representation to the file."""
     dag = get_dag(args.subdir, args.dag_id)
@@ -273,6 +282,7 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 @provide_session
 def dag_state(args, session: Session = NEW_SESSION) -> None:
     """
@@ -296,6 +306,7 @@ def dag_state(args, session: Session = NEW_SESSION) -> None:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def dag_next_execution(args) -> None:
     """
     Returns the next execution datetime of a DAG at the command line.
@@ -335,6 +346,7 @@ def dag_next_execution(args) -> None:
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def dag_list_dags(args) -> None:
     """Displays dags with or without stats at the command line."""
     dagbag = DagBag(process_subdir(args.subdir))
@@ -360,6 +372,7 @@ def dag_list_dags(args) -> None:
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 @provide_session
 def dag_details(args, session=NEW_SESSION):
     """Get DAG details given a DAG id."""
@@ -381,6 +394,7 @@ def dag_details(args, session=NEW_SESSION):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def dag_list_import_errors(args) -> None:
     """Displays dags with import errors on the command line."""
     dagbag = DagBag(process_subdir(args.subdir))
@@ -395,6 +409,7 @@ def dag_list_import_errors(args) -> None:
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def dag_report(args) -> None:
     """Displays dagbag stats at the command line."""
     dagbag = DagBag(process_subdir(args.subdir))
@@ -413,6 +428,7 @@ def dag_report(args) -> None:
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 @provide_session
 def dag_list_jobs(args, dag: DAG | None = None, session: Session = 
NEW_SESSION) -> None:
     """Lists latest n jobs."""
@@ -443,6 +459,7 @@ def dag_list_jobs(args, dag: DAG | None = None, session: 
Session = NEW_SESSION)
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 @provide_session
 def dag_list_dag_runs(args, dag: DAG | None = None, session: Session = 
NEW_SESSION) -> None:
     """Lists dag runs for a given DAG."""
@@ -479,8 +496,9 @@ def dag_list_dag_runs(args, dag: DAG | None = None, 
session: Session = NEW_SESSI
     )
 
 
-@provide_session
 @cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
 def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> 
None:
     """Execute one single DagRun for a given DAG and execution date."""
     run_conf = None
@@ -513,8 +531,9 @@ def dag_test(args, dag: DAG | None = None, session: Session 
= NEW_SESSION) -> No
             print(dot_graph.source)
 
 
-@provide_session
 @cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
 def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
     """Serialize a DAG instance."""
     
session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False))
diff --git a/airflow/cli/commands/dag_processor_command.py 
b/airflow/cli/commands/dag_processor_command.py
index 70bebf285b..757bd778cc 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -31,6 +31,7 @@ from airflow.jobs.dag_processor_job_runner import 
DagProcessorJobRunner
 from airflow.jobs.job import Job, run_job
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations, setup_logging
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 log = logging.getLogger(__name__)
 
@@ -53,6 +54,7 @@ def _create_dag_processor_job_runner(args: Any) -> 
DagProcessorJobRunner:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def dag_processor(args):
     """Starts Airflow Dag Processor Job."""
     if not conf.getboolean("scheduler", "standalone_dag_processor"):
diff --git a/airflow/cli/commands/db_command.py 
b/airflow/cli/commands/db_command.py
index 64d54cc22e..e703f6271e 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -31,10 +31,12 @@ from airflow.utils import cli as cli_utils, db
 from airflow.utils.db import REVISION_HEADS_MAP
 from airflow.utils.db_cleanup import config_dict, drop_archived_tables, 
export_archived_records, run_cleanup
 from airflow.utils.process_utils import execute_interactive
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 log = logging.getLogger(__name__)
 
 
+@providers_configuration_loaded
 def initdb(args):
     """Initializes the metadata database."""
     print("DB: " + repr(settings.engine.url))
@@ -42,6 +44,7 @@ def initdb(args):
     print("Initialization done")
 
 
+@providers_configuration_loaded
 def resetdb(args):
     """Resets the metadata database."""
     print("DB: " + repr(settings.engine.url))
@@ -51,6 +54,7 @@ def resetdb(args):
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def upgradedb(args):
     """Upgrades the metadata database."""
     print("DB: " + repr(settings.engine.url))
@@ -96,6 +100,7 @@ def upgradedb(args):
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def downgrade(args):
     """Downgrades the metadata database."""
     if args.to_revision and args.to_version:
@@ -142,12 +147,14 @@ def downgrade(args):
         raise SystemExit("Cancelled")
 
 
+@providers_configuration_loaded
 def check_migrations(args):
     """Function to wait for all airflow migrations to complete. Used for 
launching airflow in k8s."""
     db.check_migrations(timeout=args.migration_wait_timeout)
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def shell(args):
     """Run a shell that allows to access metadata database."""
     url = settings.engine.url
@@ -191,6 +198,7 @@ def shell(args):
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def check(args):
     """Runs a check command that checks if db is available."""
     retries: int = args.retry
@@ -215,6 +223,7 @@ all_tables = sorted(config_dict)
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def cleanup_tables(args):
     """Purges old records in metadata database."""
     run_cleanup(
@@ -228,6 +237,7 @@ def cleanup_tables(args):
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def export_archived(args):
     """Exports archived records from metadata database."""
     export_archived_records(
@@ -240,6 +250,7 @@ def export_archived(args):
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def drop_archived(args):
     """Drops archived tables from metadata database."""
     drop_archived_tables(
diff --git a/airflow/cli/commands/info_command.py 
b/airflow/cli/commands/info_command.py
index 7261dfc484..2e60d80b27 100644
--- a/airflow/cli/commands/info_command.py
+++ b/airflow/cli/commands/info_command.py
@@ -35,6 +35,7 @@ from airflow.providers_manager import ProvidersManager
 from airflow.typing_compat import Protocol
 from airflow.utils.cli import suppress_logs_and_warning
 from airflow.utils.platform import getuser
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.version import version as airflow_version
 
 log = logging.getLogger(__name__)
@@ -378,6 +379,7 @@ def _send_report_to_fileio(info):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def show_info(args):
     """Show information related to Airflow, system and other."""
     # Enforce anonymization, when file_io upload is tuned on.
diff --git a/airflow/cli/commands/internal_api_command.py 
b/airflow/cli/commands/internal_api_command.py
index 3d8f205bd1..72fe57c206 100644
--- a/airflow/cli/commands/internal_api_command.py
+++ b/airflow/cli/commands/internal_api_command.py
@@ -47,6 +47,7 @@ from airflow.models import import_all_models
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations, setup_logging
 from airflow.utils.process_utils import check_if_pidfile_process_is_running
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.www.extensions.init_dagbag import init_dagbag
 from airflow.www.extensions.init_jinja_globals import init_jinja_globals
 from airflow.www.extensions.init_manifest_files import configure_manifest_files
@@ -58,6 +59,7 @@ app: Flask | None = None
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def internal_api(args):
     """Starts Airflow Internal API."""
     print(settings.HEADER)
diff --git a/airflow/cli/commands/jobs_command.py 
b/airflow/cli/commands/jobs_command.py
index bcdd6df475..b6509ea642 100644
--- a/airflow/cli/commands/jobs_command.py
+++ b/airflow/cli/commands/jobs_command.py
@@ -21,10 +21,12 @@ from sqlalchemy.orm import Session
 
 from airflow.jobs.job import Job
 from airflow.utils.net import get_hostname
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import JobState
 
 
+@providers_configuration_loaded
 @provide_session
 def check(args, session: Session = NEW_SESSION) -> None:
     """Checks if job(s) are still alive."""
diff --git a/airflow/cli/commands/kerberos_command.py 
b/airflow/cli/commands/kerberos_command.py
index 4bbe3f6df9..4dd63d52eb 100644
--- a/airflow/cli/commands/kerberos_command.py
+++ b/airflow/cli/commands/kerberos_command.py
@@ -24,9 +24,11 @@ from airflow import settings
 from airflow.security import kerberos as krb
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def kerberos(args):
     """Start a kerberos ticket renewer."""
     print(settings.HEADER)
diff --git a/airflow/cli/commands/kubernetes_command.py 
b/airflow/cli/commands/kubernetes_command.py
index c367d4be87..1555f7be92 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -33,9 +33,11 @@ from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models import DagRun, TaskInstance
 from airflow.utils import cli as cli_utils, yaml
 from airflow.utils.cli import get_dag
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def generate_pod_yaml(args):
     """Generates yaml files for each task in the DAG. Used for testing output 
of KubernetesExecutor."""
     execution_date = args.execution_date
@@ -71,6 +73,7 @@ def generate_pod_yaml(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def cleanup_pods(args):
     """Clean up k8s pods in evicted/failed/succeeded/pending states."""
     namespace = args.namespace
diff --git a/airflow/cli/commands/plugins_command.py 
b/airflow/cli/commands/plugins_command.py
index 50ee583099..29dd75674a 100644
--- a/airflow/cli/commands/plugins_command.py
+++ b/airflow/cli/commands/plugins_command.py
@@ -23,6 +23,7 @@ from airflow import plugins_manager
 from airflow.cli.simple_table import AirflowConsole
 from airflow.plugins_manager import PluginsDirectorySource, get_plugin_info
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
 def _get_name(class_like_object) -> str:
@@ -39,6 +40,7 @@ def _join_plugins_names(value: list[Any] | Any) -> str:
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def dump_plugins(args):
     """Dump plugins information."""
     plugins_info: list[dict[str, str]] = get_plugin_info()
diff --git a/airflow/cli/commands/pool_command.py 
b/airflow/cli/commands/pool_command.py
index aa56ba8fea..8d9e206f1b 100644
--- a/airflow/cli/commands/pool_command.py
+++ b/airflow/cli/commands/pool_command.py
@@ -27,6 +27,7 @@ from airflow.cli.simple_table import AirflowConsole
 from airflow.exceptions import PoolNotFound
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
 def _show_pools(pools, output):
@@ -42,6 +43,7 @@ def _show_pools(pools, output):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def pool_list(args):
     """Displays info of all the pools."""
     api_client = get_current_api_client()
@@ -50,6 +52,7 @@ def pool_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def pool_get(args):
     """Displays pool info by a given name."""
     api_client = get_current_api_client()
@@ -62,6 +65,7 @@ def pool_get(args):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def pool_set(args):
     """Creates new pool with a given name and slots."""
     api_client = get_current_api_client()
@@ -71,6 +75,7 @@ def pool_set(args):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def pool_delete(args):
     """Deletes pool by a given name."""
     api_client = get_current_api_client()
@@ -83,6 +88,7 @@ def pool_delete(args):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def pool_import(args):
     """Imports pools from the file."""
     if not os.path.exists(args.file):
@@ -93,6 +99,7 @@ def pool_import(args):
     print(f"Uploaded {len(pools)} pool(s)")
 
 
+@providers_configuration_loaded
 def pool_export(args):
     """Exports all the pools to the file."""
     pools = pool_export_helper(args.file)
diff --git a/airflow/cli/commands/provider_command.py 
b/airflow/cli/commands/provider_command.py
index adcb1d18fa..a55032d9f2 100644
--- a/airflow/cli/commands/provider_command.py
+++ b/airflow/cli/commands/provider_command.py
@@ -24,6 +24,7 @@ import re2
 from airflow.cli.simple_table import AirflowConsole
 from airflow.providers_manager import ProvidersManager
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 ERROR_IMPORTING_HOOK = "Error when importing hook!"
 
@@ -33,6 +34,7 @@ def _remove_rst_syntax(value: str) -> str:
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def provider_get(args):
     """Get a provider info."""
     providers = ProvidersManager().providers
@@ -54,6 +56,7 @@ def provider_get(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def providers_list(args):
     """Lists all providers at the command line."""
     AirflowConsole().print_as(
@@ -68,6 +71,7 @@ def providers_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def hooks_list(args):
     """Lists all hooks at the command line."""
     AirflowConsole().print_as(
@@ -84,6 +88,7 @@ def hooks_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def triggers_list(args):
     AirflowConsole().print_as(
         data=ProvidersManager().trigger,
@@ -97,6 +102,7 @@ def triggers_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def connection_form_widget_list(args):
     """Lists all custom connection form fields at the command line."""
     AirflowConsole().print_as(
@@ -112,6 +118,7 @@ def connection_form_widget_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def connection_field_behaviours(args):
     """Lists field behaviours."""
     AirflowConsole().print_as(
@@ -124,6 +131,7 @@ def connection_field_behaviours(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def extra_links_list(args):
     """Lists all extra links at the command line."""
     AirflowConsole().print_as(
@@ -136,6 +144,7 @@ def extra_links_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def logging_list(args):
     """Lists all log task handlers at the command line."""
     AirflowConsole().print_as(
@@ -148,6 +157,7 @@ def logging_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def secrets_backends_list(args):
     """Lists all secrets backends at the command line."""
     AirflowConsole().print_as(
@@ -160,6 +170,7 @@ def secrets_backends_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def auth_backend_list(args):
     """Lists all API auth backend modules at the command line."""
     AirflowConsole().print_as(
@@ -172,6 +183,7 @@ def auth_backend_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def executors_list(args):
     """Lists all executors at the command line."""
     AirflowConsole().print_as(
@@ -184,6 +196,7 @@ def executors_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def config_list(args):
     """Lists all configurations at the command line."""
     AirflowConsole().print_as(
@@ -196,8 +209,8 @@ def config_list(args):
 
 
 @suppress_logs_and_warning
-def status(args):
-    """Informs if providers manager has been initialized.
+def lazy_loaded(args):
+    """Informs if providers manager has been initialized too early.
 
     If provider is initialized, shows the stack trace and exit with error code 
1.
     """
diff --git a/airflow/cli/commands/role_command.py 
b/airflow/cli/commands/role_command.py
index 91fa267429..db11d69dd2 100644
--- a/airflow/cli/commands/role_command.py
+++ b/airflow/cli/commands/role_command.py
@@ -26,11 +26,13 @@ import os
 from airflow.cli.simple_table import AirflowConsole
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.www.fab_security.sqla.models import Action, Permission, Resource, 
Role
 from airflow.www.security import EXISTING_ROLES
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def roles_list(args):
     """Lists all existing roles."""
     from airflow.utils.cli_app_builder import get_application_builder
@@ -58,6 +60,7 @@ def roles_list(args):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def roles_create(args):
     """Creates new empty role in DB."""
     from airflow.utils.cli_app_builder import get_application_builder
@@ -70,6 +73,7 @@ def roles_create(args):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def roles_delete(args):
     """Deletes role in DB."""
     from airflow.utils.cli_app_builder import get_application_builder
@@ -138,6 +142,7 @@ def __roles_add_or_remove_permissions(args):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def roles_add_perms(args):
     """Adds permissions to role in DB."""
     __roles_add_or_remove_permissions(args)
@@ -145,12 +150,14 @@ def roles_add_perms(args):
 
 @cli_utils.action_cli
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def roles_del_perms(args):
     """Deletes permissions from role in DB."""
     __roles_add_or_remove_permissions(args)
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def roles_export(args):
     """
     Exports all the roles from the database to a file.
diff --git a/airflow/cli/commands/rotate_fernet_key_command.py 
b/airflow/cli/commands/rotate_fernet_key_command.py
index e9973978e0..1a47a29e31 100644
--- a/airflow/cli/commands/rotate_fernet_key_command.py
+++ b/airflow/cli/commands/rotate_fernet_key_command.py
@@ -21,10 +21,12 @@ from sqlalchemy import select
 
 from airflow.models import Connection, Variable
 from airflow.utils import cli as cli_utils
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import create_session
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def rotate_fernet_key(args):
     """Rotates all encrypted connection credentials and variables."""
     with create_session() as session:
diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index 22f9a75808..808645282c 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -32,6 +32,7 @@ from airflow.jobs.job import Job, run_job
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import process_subdir, setup_locations, setup_logging, 
sigint_handler, sigquit_handler
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.scheduler_health import serve_health_check
 
 
@@ -43,6 +44,7 @@ def _run_scheduler_job(job_runner: SchedulerJobRunner, *, 
skip_serve_logs: bool)
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def scheduler(args):
     """Starts Airflow Scheduler."""
     print(settings.HEADER)
diff --git a/airflow/cli/commands/standalone_command.py 
b/airflow/cli/commands/standalone_command.py
index 9bdd49a82a..68abfdd000 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -35,6 +35,7 @@ from airflow.jobs.job import most_recent_job
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
 from airflow.utils import db
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
 class StandaloneCommand:
@@ -56,6 +57,7 @@ class StandaloneCommand:
         self.ready_time = None
         self.ready_delay = 3
 
+    @providers_configuration_loaded
     def run(self):
         """Main run loop."""
         self.print_output("standalone", "Starting Airflow Standalone")
diff --git a/airflow/cli/commands/sync_perm_command.py 
b/airflow/cli/commands/sync_perm_command.py
index 7ae3406967..ab458b2d93 100644
--- a/airflow/cli/commands/sync_perm_command.py
+++ b/airflow/cli/commands/sync_perm_command.py
@@ -19,9 +19,11 @@
 from __future__ import annotations
 
 from airflow.utils import cli as cli_utils
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def sync_perm(args):
     """Updates permissions for existing roles and DAGs."""
     from airflow.utils.cli_app_builder import get_application_builder
diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index 796d3a8cf1..da47e8dd8b 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -65,6 +65,7 @@ from airflow.utils.log.file_task_handler import 
_set_task_deferred_context_var
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.log.secrets_masker import RedactedIO
 from airflow.utils.net import get_hostname
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState
 
@@ -438,6 +439,7 @@ def task_run(args, dag: DAG | None = None) -> 
TaskReturnCode | None:
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def task_failed_deps(args) -> None:
     """
     Get task instance dependencies that were not met.
@@ -468,6 +470,7 @@ def task_failed_deps(args) -> None:
 
 @cli_utils.action_cli(check_db=False)
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def task_state(args) -> None:
     """
     Returns the state of a TaskInstance at the command line.
@@ -483,6 +486,7 @@ def task_state(args) -> None:
 
 @cli_utils.action_cli(check_db=False)
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def task_list(args, dag: DAG | None = None) -> None:
     """Lists the tasks within a DAG at the command line."""
     dag = dag or get_dag(args.subdir, args.dag_id)
@@ -530,6 +534,7 @@ def _guess_debugger() -> _SupportedDebugger:
 
 @cli_utils.action_cli(check_db=False)
 @suppress_logs_and_warning
+@providers_configuration_loaded
 @provide_session
 def task_states_for_dag_run(args, session: Session = NEW_SESSION) -> None:
     """Get the status of all task instances in a DagRun."""
@@ -631,6 +636,7 @@ def task_test(args, dag: DAG | None = None) -> None:
 
 @cli_utils.action_cli(check_db=False)
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def task_render(args, dag: DAG | None = None) -> None:
     """Renders and displays templated fields for a given task."""
     if not dag:
@@ -653,6 +659,7 @@ def task_render(args, dag: DAG | None = None) -> None:
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def task_clear(args) -> None:
     """Clears all task instances or only those matched by regex for a 
DAG(s)."""
     logging.basicConfig(level=settings.LOGGING_LEVEL, 
format=settings.SIMPLE_LOG_FORMAT)
diff --git a/airflow/cli/commands/triggerer_command.py 
b/airflow/cli/commands/triggerer_command.py
index aa06d641c7..c7d0827bd8 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -32,6 +32,7 @@ from airflow.jobs.job import Job, run_job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, 
sigquit_handler
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.serve_logs import serve_logs
 
 
@@ -51,6 +52,7 @@ def _serve_logs(skip_serve_logs: bool = False) -> 
Generator[None, None, None]:
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def triggerer(args):
     """Starts Airflow Triggerer."""
     settings.MASK_SECRETS_IN_LOGS = True
diff --git a/airflow/cli/commands/user_command.py 
b/airflow/cli/commands/user_command.py
index a5a5be9787..1553d27a01 100644
--- a/airflow/cli/commands/user_command.py
+++ b/airflow/cli/commands/user_command.py
@@ -32,6 +32,7 @@ from marshmallow.exceptions import ValidationError
 from airflow.cli.simple_table import AirflowConsole
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
 class UserSchema(Schema):
@@ -46,6 +47,7 @@ class UserSchema(Schema):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def users_list(args):
     """Lists users at the command line."""
     from airflow.utils.cli_app_builder import get_application_builder
@@ -60,6 +62,7 @@ def users_list(args):
 
 
 @cli_utils.action_cli(check_db=True)
+@providers_configuration_loaded
 def users_create(args):
     """Creates new user in the DB."""
     from airflow.utils.cli_app_builder import get_application_builder
@@ -108,6 +111,7 @@ def _find_user(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def users_delete(args):
     """Deletes user from DB."""
     user = _find_user(args)
@@ -125,6 +129,7 @@ def users_delete(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def users_manage_role(args, remove=False):
     """Deletes or appends user roles."""
     user = _find_user(args)
@@ -153,6 +158,7 @@ def users_manage_role(args, remove=False):
             print(f'User "{user.username}" added to role "{args.role}"')
 
 
+@providers_configuration_loaded
 def users_export(args):
     """Exports all users to the json file."""
     from airflow.utils.cli_app_builder import get_application_builder
@@ -182,6 +188,7 @@ def users_export(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def users_import(args):
     """Imports users from the json file."""
     json_file = getattr(args, "import")
diff --git a/airflow/cli/commands/variable_command.py 
b/airflow/cli/commands/variable_command.py
index 32f6b0c198..34b46530d5 100644
--- a/airflow/cli/commands/variable_command.py
+++ b/airflow/cli/commands/variable_command.py
@@ -28,10 +28,12 @@ from airflow.cli.simple_table import AirflowConsole
 from airflow.models import Variable
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import suppress_logs_and_warning
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import create_session
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def variables_list(args):
     """Displays all the variables."""
     with create_session() as session:
@@ -40,6 +42,7 @@ def variables_list(args):
 
 
 @suppress_logs_and_warning
+@providers_configuration_loaded
 def variables_get(args):
     """Displays variable by a given name."""
     try:
@@ -54,6 +57,7 @@ def variables_get(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def variables_set(args):
     """Creates new variable with a given name and value."""
     Variable.set(args.key, args.value, serialize_json=args.json)
@@ -61,6 +65,7 @@ def variables_set(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def variables_delete(args):
     """Deletes variable by a given name."""
     Variable.delete(args.key)
@@ -68,6 +73,7 @@ def variables_delete(args):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def variables_import(args):
     """Imports variables from a given file."""
     if os.path.exists(args.file):
@@ -76,6 +82,7 @@ def variables_import(args):
         raise SystemExit("Missing variables file.")
 
 
+@providers_configuration_loaded
 def variables_export(args):
     """Exports all the variables to the file."""
     _variable_export_helper(args.file)
diff --git a/airflow/cli/commands/webserver_command.py 
b/airflow/cli/commands/webserver_command.py
index f5d44e5058..5399f8ba64 100644
--- a/airflow/cli/commands/webserver_command.py
+++ b/airflow/cli/commands/webserver_command.py
@@ -42,6 +42,7 @@ from airflow.utils.cli import setup_locations, setup_logging
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.process_utils import check_if_pidfile_process_is_running
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 log = logging.getLogger(__name__)
 
@@ -320,6 +321,7 @@ class GunicornMonitor(LoggingMixin):
 
 
 @cli_utils.action_cli
+@providers_configuration_loaded
 def webserver(args):
     """Starts Airflow Webserver."""
     print(settings.HEADER)
diff --git a/airflow/providers/celery/executors/celery_executor_utils.py 
b/airflow/providers/celery/executors/celery_executor_utils.py
index 1c02dbb57b..e970174a65 100644
--- a/airflow/providers/celery/executors/celery_executor_utils.py
+++ b/airflow/providers/celery/executors/celery_executor_utils.py
@@ -57,18 +57,36 @@ if TYPE_CHECKING:
 
     TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], 
Task]
 
+# IMPORTANT NOTE! Celery Executor has initialization done dynamically and it 
performs initialization when
+# it is imported, so we need fallbacks here in order to be able to import the 
class directly without
+# having configuration initialized before. Do not remove those fallbacks!
+#
+# This is not strictly needed for production:
+#
+#   * for Airflow 2.6 and before the defaults will come from the core defaults
+#   * for Airflow 2.7+ the defaults will be loaded via ProvidersManager
+#
+# But it helps in our tests to import the executor class and validate if the 
celery code can be imported
+# in the current and older versions of Airflow.
+
 OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout", fallback=1.0)
 
 # Make it constant for unit test.
 CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state"
 
 if conf.has_option("celery", "celery_config_options"):
-    celery_configuration = conf.getimport("celery", "celery_config_options")
+    celery_configuration = conf.getimport(
+        "celery",
+        "celery_config_options",
+        
fallback="airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG",
+    )
 
 else:
     celery_configuration = DEFAULT_CELERY_CONFIG
 
-celery_app_name = conf.get("celery", "CELERY_APP_NAME")
+celery_app_name = conf.get(
+    "celery", "CELERY_APP_NAME", 
fallback="airflow.providers.celery.executors.celery_executor"
+)
 if celery_app_name == "airflow.executors.celery_executor":
     warnings.warn(
         "The celery.CELERY_APP_NAME configuration uses deprecated package 
name: "
diff --git a/airflow/providers/celery/executors/celery_kubernetes_executor.py 
b/airflow/providers/celery/executors/celery_kubernetes_executor.py
index 72c037b684..d79b3cd1db 100644
--- a/airflow/providers/celery/executors/celery_kubernetes_executor.py
+++ b/airflow/providers/celery/executors/celery_kubernetes_executor.py
@@ -26,6 +26,7 @@ from airflow.configuration import conf
 from airflow.executors.kubernetes_executor import KubernetesExecutor
 from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 if TYPE_CHECKING:
     from airflow.executors.base_executor import CommandType, 
EventBufferValueType, QueuedTaskInstanceType
@@ -57,12 +58,8 @@ class CeleryKubernetesExecutor(LoggingMixin):
     callback_sink: BaseCallbackSink | None = None
 
     @cached_property
+    @providers_configuration_loaded
     def kubernetes_queue(self) -> str:
-        # lazily retrieve the value of kubernetes_queue from the configuration
-        # because it might need providers
-        from airflow.providers_manager import ProvidersManager
-
-        ProvidersManager().initialize_providers_configuration()
         return conf.get("celery_kubernetes_executor", "kubernetes_queue")
 
     def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: 
KubernetesExecutor):
diff --git a/airflow/providers/celery/executors/default_celery.py 
b/airflow/providers/celery/executors/default_celery.py
index f5c23cbb81..6af4c1cd20 100644
--- a/airflow/providers/celery/executors/default_celery.py
+++ b/airflow/providers/celery/executors/default_celery.py
@@ -25,7 +25,6 @@ import re2
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, AirflowException
-from airflow.providers_manager import ProvidersManager
 
 
 def _broker_supports_visibility_timeout(url):
@@ -34,8 +33,19 @@ def _broker_supports_visibility_timeout(url):
 
 log = logging.getLogger(__name__)
 
-ProvidersManager().initialize_providers_configuration()
-broker_url = conf.get("celery", "BROKER_URL")
+# IMPORTANT NOTE! Celery Executor has initialization done dynamically and it 
performs initialization when
+# it is imported, so we need fallbacks here in order to be able to import the 
class directly without
+# having configuration initialized before. Do not remove those fallbacks!
+#
+# This is not strictly needed for production:
+#
+#   * for Airflow 2.6 and before the defaults will come from the core defaults
+#   * for Airflow 2.7+ the defaults will be loaded via ProvidersManager
+#
+# But it helps in our tests to import the executor class and validate if the 
celery code can be imported
+# in the current and older versions of Airflow.
+
+broker_url = conf.get("celery", "BROKER_URL", fallback="redis://redis:6379/0")
 
 broker_transport_options = conf.getsection("celery_broker_transport_options") 
or {}
 if "visibility_timeout" not in broker_transport_options:
@@ -61,19 +71,19 @@ else:
 DEFAULT_CELERY_CONFIG = {
     "accept_content": ["json"],
     "event_serializer": "json",
-    "worker_prefetch_multiplier": conf.getint("celery", 
"worker_prefetch_multiplier"),
+    "worker_prefetch_multiplier": conf.getint("celery", 
"worker_prefetch_multiplier", fallback=1),
     "task_acks_late": True,
     "task_default_queue": conf.get("operators", "DEFAULT_QUEUE"),
     "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
-    "task_track_started": conf.getboolean("celery", "task_track_started"),
+    "task_track_started": conf.getboolean("celery", "task_track_started", 
fallback=True),
     "broker_url": broker_url,
     "broker_transport_options": broker_transport_options_for_celery,
     "result_backend": result_backend,
     "database_engine_options": conf.getjson(
         "celery", "result_backend_sqlalchemy_engine_options", fallback={}
     ),
-    "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"),
-    "worker_enable_remote_control": conf.getboolean("celery", 
"worker_enable_remote_control"),
+    "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY", 
fallback=16),
+    "worker_enable_remote_control": conf.getboolean("celery", 
"worker_enable_remote_control", fallback=True),
 }
 
 
diff --git a/airflow/utils/providers_configuration_loader.py 
b/airflow/utils/providers_configuration_loader.py
new file mode 100644
index 0000000000..df7f9d3175
--- /dev/null
+++ b/airflow/utils/providers_configuration_loader.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from functools import wraps
+from typing import Callable, TypeVar, cast
+
+T = TypeVar("T", bound=Callable)
+
+
+def providers_configuration_loaded(func: T) -> T:
+    """
+    Decorator that makes sure that providers configuration is loaded before 
actually calling
+    the decorated function.
+
+    ProvidersManager initialization of configuration is relatively inexpensive 
- it walks through
+    all providers's entrypoints, retrieve the provider_info and loads config 
yaml parts of the get_info.
+    Unlike initialization of hooks and operators it does not import any of the 
provider's code, so it can
+    be run quickly by all commands that need to access providers 
configuration. We cannot even import
+    ProvidersManager while importing any of the commands, so we need to 
locally import it here.
+
+    We cannot initialize the configuration in settings/conf because of the way 
how conf/settings are used
+    internally - they are loaded while importing airflow, and we need to 
access airflow version conf in the
+    ProvidesManager initialization, so instead we opt for decorating all the 
methods that need it with this
+    decorator.
+
+    The decorator should be placed below @suppress_logs_and_warning but above 
@provide_session in order to
+    avoid spoiling the output of formatted options with some warnings ar 
infos, and to be prepared that
+    session creation might need some configuration defaults from the providers 
configuration.
+
+    :param func: function to makes sure that providers configuration is loaded 
before actually calling
+    """
+
+    @wraps(func)
+    def wrapped_function(*args, **kwargs):
+        from airflow.providers_manager import ProvidersManager
+
+        ProvidersManager().initialize_providers_configuration()
+        return func(*args, **kwargs)
+
+    return cast(T, wrapped_function)
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index e770e19416..6089961de9 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -324,7 +324,7 @@ class TestCliSubprocess:
         CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
         CONFIG_FILE.touch(exist_ok=True)
         result = subprocess.run(
-            [sys.executable, "-m", "airflow", "providers", "status"],
+            [sys.executable, "-m", "airflow", "providers", "lazy-loaded"],
             env={"PYTHONPATH": os.pathsep.join(sys.path)},
             check=False,
             text=True,
@@ -339,7 +339,7 @@ class TestCliSubprocess:
         """
         CONFIG_FILE.unlink(missing_ok=True)
         result = subprocess.run(
-            [sys.executable, "-m", "airflow", "version"],
+            [sys.executable, "-m", "airflow", "config", "list"],
             env={"PYTHONPATH": os.pathsep.join(sys.path)},
             check=False,
             text=True,
diff --git a/tests/integration/executors/test_celery_executor.py 
b/tests/integration/executors/test_celery_executor.py
index c4c3e5d6f0..9e8d365c5c 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -39,7 +39,6 @@ from airflow.exceptions import AirflowException, 
AirflowTaskTimeout
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
 from airflow.operators.bash import BashOperator
-from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
 from airflow.utils.state import State
 from tests.test_utils import db
 
@@ -61,6 +60,8 @@ class FakeCeleryResult:
 
 @contextlib.contextmanager
 def _prepare_app(broker_url=None, execute=None):
+    from airflow.providers.celery.executors import celery_executor_utils
+
     broker_url = broker_url or conf.get("celery", "BROKER_URL")
     execute = execute or celery_executor_utils.execute_command.__wrapped__
 
@@ -106,6 +107,8 @@ class TestCeleryExecutor:
     @pytest.mark.flaky(reruns=3)
     @pytest.mark.parametrize("broker_url", _prepare_test_bodies())
     def test_celery_integration(self, broker_url):
+        from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
+
         success_command = ["airflow", "tasks", "run", "true", "some_parameter"]
         fail_command = ["airflow", "version"]
 
@@ -163,6 +166,8 @@ class TestCeleryExecutor:
         assert executor.queued_tasks == {}
 
     def test_error_sending_task(self):
+        from airflow.providers.celery.executors import celery_executor
+
         def fake_execute_command():
             pass
 
@@ -189,6 +194,7 @@ class TestCeleryExecutor:
 
     def test_retry_on_error_sending_task(self, caplog):
         """Test that Airflow retries publishing tasks to Celery Broker at 
least 3 times"""
+        from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
 
         with _prepare_app(), caplog.at_level(logging.INFO), mock.patch.object(
             # Mock `with timeout()` to _instantly_ fail.
@@ -268,6 +274,8 @@ class TestBulkStateFetcher:
         return_value=[json.dumps({"status": "SUCCESS", "task_id": "123"})],
     )
     def test_should_support_kv_backend(self, mock_mget, caplog):
+        from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
+
         caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app)
@@ -293,6 +301,8 @@ class TestBulkStateFetcher:
 
     @mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
     def test_should_support_db_backend(self, mock_session, caplog):
+        from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
+
         caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = DatabaseBackend(app=celery_executor.app, 
url="sqlite3://")
@@ -318,6 +328,8 @@ class TestBulkStateFetcher:
 
     @mock.patch("celery.backends.database.DatabaseBackend.ResultSession")
     def test_should_retry_db_backend(self, mock_session, caplog):
+        from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
+
         caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         from sqlalchemy.exc import DatabaseError
 
@@ -352,6 +364,8 @@ class TestBulkStateFetcher:
         ]
 
     def test_should_support_base_backend(self, caplog):
+        from airflow.providers.celery.executors import celery_executor_utils
+
         caplog.set_level(logging.DEBUG, logger=self.bulk_state_fetcher_logger)
         with _prepare_app():
             mock_backend = mock.MagicMock(autospec=BaseBackend)
diff --git a/tests/providers/celery/executors/test_celery_executor.py 
b/tests/providers/celery/executors/test_celery_executor.py
index fd00d6a083..c8ddee8c4e 100644
--- a/tests/providers/celery/executors/test_celery_executor.py
+++ b/tests/providers/celery/executors/test_celery_executor.py
@@ -37,7 +37,7 @@ from airflow.configuration import conf
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
-from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils
+from airflow.providers.celery.executors import celery_executor, 
celery_executor_utils, default_celery
 from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.utils import timezone
 from airflow.utils.state import State
@@ -65,6 +65,7 @@ class FakeCeleryResult:
 @contextlib.contextmanager
 def _prepare_app(broker_url=None, execute=None):
     broker_url = broker_url or conf.get("celery", "BROKER_URL")
+
     execute = execute or celery_executor_utils.execute_command.__wrapped__
 
     test_config = dict(celery_executor_utils.celery_configuration)
@@ -185,6 +186,7 @@ class TestCeleryExecutor:
 
         key1 = TaskInstance(task=task_1, run_id=None)
         tis = [key1]
+
         executor = celery_executor.CeleryExecutor()
 
         assert executor.try_adopt_task_instances(tis) == tis
@@ -208,6 +210,7 @@ class TestCeleryExecutor:
         ti2.state = State.QUEUED
 
         tis = [ti1, ti2]
+
         executor = celery_executor.CeleryExecutor()
         assert executor.running == set()
         assert executor.tasks == {}
@@ -243,6 +246,7 @@ class TestCeleryExecutor:
         tis = [ti]
         with _prepare_app() as app:
             app.control.revoke = mock.MagicMock()
+
             executor = celery_executor.CeleryExecutor()
             executor.job_id = 1
             executor.running = {ti.key}
@@ -258,8 +262,6 @@ class TestCeleryExecutor:
     def test_result_backend_sqlalchemy_engine_options(self, mock_celery):
         import importlib
 
-        from airflow.providers.celery.executors import celery_executor_utils, 
default_celery
-
         # reload celery conf to apply the new config
         importlib.reload(default_celery)
         # reload celery_executor_utils to recreate the celery app with new 
config

Reply via email to