potiuk commented on code in PR #60218:
URL: https://github.com/apache/airflow/pull/60218#discussion_r2698279956


##########
task-sdk/src/airflow/sdk/providers_manager_runtime.py:
##########
@@ -0,0 +1,613 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Manages runtime provider resources for task execution."""
+
+from __future__ import annotations
+
+import functools
+import inspect
+import traceback
+import warnings
+from collections.abc import Callable, MutableMapping
+from typing import TYPE_CHECKING, Any
+from urllib.parse import SplitResult
+
+import structlog
+
+from airflow.sdk._shared.module_loading import import_string
+from airflow.sdk._shared.providers_discovery import (
+    KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS,
+    HookClassProvider,
+    HookInfo,
+    LazyDictWithCache,
+    PluginInfo,
+    ProviderInfo,
+    _check_builtin_provider_prefix,
+    _create_provider_info_schema_validator,
+    discover_all_providers_from_packages,
+    log_import_warning,
+    log_optional_feature_disabled,
+    provider_info_cache,
+)
+from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
+from airflow.sdk.exceptions import AirflowOptionalProviderFeatureException
+
+if TYPE_CHECKING:
+    from airflow.sdk import BaseHook
+    from airflow.sdk.bases.decorator import TaskDecorator
+    from airflow.sdk.definitions.asset import Asset
+
+log = structlog.getLogger(__name__)
+
+
+def _correctness_check(provider_package: str, class_name: str, provider_info: 
ProviderInfo) -> Any:
+    """
+    Perform coherence check on provider classes.
+
+    For apache-airflow providers - it checks if it starts with appropriate 
package. For all providers
+    it tries to import the provider - checking that there are no exceptions 
during importing.
+    It logs appropriate warning in case it detects any problems.
+
+    :param provider_package: name of the provider package
+    :param class_name: name of the class to import
+
+    :return the class if the class is OK, None otherwise.
+    """
+    if not _check_builtin_provider_prefix(provider_package, class_name):
+        return None
+    try:
+        imported_class = import_string(class_name)
+    except AirflowOptionalProviderFeatureException as e:
+        # When the provider class raises 
AirflowOptionalProviderFeatureException
+        # this is an expected case when only some classes in provider are
+        # available. We just log debug level here and print info message in 
logs so that
+        # the user is aware of it
+        log_optional_feature_disabled(class_name, e, provider_package)
+        return None
+    except ImportError as e:
+        if "No module named 'airflow.providers." in e.msg:
+            # handle cases where another provider is missing. This can only 
happen if
+            # there is an optional feature, so we log debug and print 
information about it
+            log_optional_feature_disabled(class_name, e, provider_package)
+            return None
+        for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS:
+            # Until we convert all providers to use 
AirflowOptionalProviderFeatureException
+            # we assume any problem with importing another "provider" is 
because this is an
+            # optional feature, so we log debug and print information about it
+            if known_error[0] == provider_package and known_error[1] in e.msg:
+                log_optional_feature_disabled(class_name, e, provider_package)
+                return None
+        # But when we have no idea - we print warning to logs
+        log_import_warning(class_name, e, provider_package)
+        return None
+    except Exception as e:
+        log_import_warning(class_name, e, provider_package)
+        return None
+    return imported_class
+
+
+class ProvidersManagerRuntime(LoggingMixin):
+    """
+    Manages runtime provider resources for task execution.
+
+    This is a Singleton class. The first time it is instantiated, it discovers 
all available
+    runtime provider resources (hooks, taskflow decorators, filesystems, asset 
handlers).
+    """
+
+    resource_version = "0"
+    _initialized: bool = False
+    _initialization_stack_trace = None
+    _instance: ProvidersManagerRuntime | None = None
+
+    def __new__(cls):
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+        return cls._instance
+
+    @staticmethod
+    def initialized() -> bool:
+        return ProvidersManagerRuntime._initialized
+
+    @staticmethod
+    def initialization_stack_trace() -> str | None:
+        return ProvidersManagerRuntime._initialization_stack_trace
+
+    def __init__(self):
+        """Initialize the runtime manager."""
+        # skip initialization if already initialized
+        if self.initialized():
+            return
+        super().__init__()
+        ProvidersManagerRuntime._initialized = True
+        ProvidersManagerRuntime._initialization_stack_trace = "".join(
+            traceback.format_stack(inspect.currentframe())
+        )
+        self._initialized_cache: dict[str, bool] = {}
+        # Keeps dict of providers keyed by module name
+        self._provider_dict: dict[str, ProviderInfo] = {}
+        self._fs_set: set[str] = set()
+        self._asset_uri_handlers: dict[str, Callable[[SplitResult], 
SplitResult]] = {}
+        self._asset_factories: dict[str, Callable[..., Asset]] = {}
+        self._asset_to_openlineage_converters: dict[str, Callable] = {}
+        self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache()
+        # keeps mapping between connection_types and hook class, package they 
come from
+        self._hook_provider_dict: dict[str, HookClassProvider] = {}
+        # Keeps dict of hooks keyed by connection type. They are lazy 
evaluated at access time
+        self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = 
LazyDictWithCache()
+        self._plugins_set: set[PluginInfo] = set()
+        self._provider_schema_validator = 
_create_provider_info_schema_validator()
+        self._init_airflow_core_hooks()
+
+    def _init_airflow_core_hooks(self):
+        """Initialize the hooks dict with default hooks from Airflow core."""
+        core_dummy_hooks = {
+            "generic": "Generic",
+            "email": "Email",
+        }
+        for key, display in core_dummy_hooks.items():
+            self._hooks_lazy_dict[key] = HookInfo(
+                hook_class_name=None,
+                connection_id_attribute_name=None,
+                package_name=None,
+                hook_name=display,
+                connection_type=None,
+                connection_testable=False,
+            )
+        for conn_type, class_name in (
+            ("fs", "airflow.providers.standard.hooks.filesystem.FSHook"),
+            ("package_index", 
"airflow.providers.standard.hooks.package_index.PackageIndexHook"),

Review Comment:
   Hmm. Should not that be moved to standard provider 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to