ashb commented on a change in pull request #12512:
URL: https://github.com/apache/airflow/pull/12512#discussion_r531578439



##########
File path: airflow/providers_manager.py
##########
@@ -36,52 +38,147 @@
 
 
 def _create_validator():
+    """Creates JSON schema validator from the provider.yaml.schema.json"""
     schema = json.loads(importlib_resources.read_text('airflow', 
'provider.yaml.schema.json'))
     cls = jsonschema.validators.validator_for(schema)
     validator = cls(schema)
     return validator
 
 
 class ProvidersManager:
-    """Manages all provider packages."""
+    """
+    Manages all provider packages. This is a Singleton class. The first time 
it is
+    instantiated, it discovers all available providers in installed packages 
and
+    local source folders (if airflow is run from sources).
+    """
+
+    _instance = None
+    resource_version = "0"
+
+    def __new__(cls):
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+        return cls._instance
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_dict: Dict[str, Tuple[str, Dict]] = {}
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
+        # Local source folders are loaded first. They should take precedence 
over the package ones for
+        # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
+        # So there is no risk we are going to override package provider 
accidentally. This can only happen
+        # in case of local development
+        self._discover_all_airflow_builtin_providers_from_local_sources()
+        self._discover_all_providers_from_packages()
+        self._sort_provider_dictionary()
+
+    def _sort_provider_dictionary(self):
+        """
+        Sort provider_dictionary using OrderedDict.
 
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        The dictionary gets sorted so that when you iterate through it, the 
providers are by
+        default returned in alphabetical order.
+        """
+        sorted_dict = OrderedDict()
+        for provider_name in sorted(self._provider_dict.keys()):
+            sorted_dict[provider_name] = self._provider_dict[provider_name]
+        self._provider_dict = sorted_dict
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+    def _discover_all_providers_from_packages(self) -> None:
+        """
+        Discovers all providers by scanning packages installed. The list of 
providers should be returned
+        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
+        'airflow/provider.yaml.schema.json' schema.
+        """
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            log.debug("Loading %s from package %s", entry_point, package_name)
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.VersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            if package_name not in self._provider_dict:
+                self._provider_dict[package_name] = (version, provider_info)
+            else:
+                log.warning(
+                    "The provider for package '%s' could not be registered 
from because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+
+    def _discover_all_airflow_builtin_providers_from_local_sources(self) -> 
None:
+        """
+        Finds all built-in airflow providers if airflow is run from the local 
sources.
+        It finds `provider.yaml` files for all such providers and registers 
the providers using those.
+
+        This 'provider.yaml' scanning takes precedence over scanning packages 
installed
+        in case you have both sources and packages installed, the providers 
will be loaded from
+        the "airflow" sources rather than from the packages.
+        """
+        try:
+            import airflow.providers
+        except ImportError:
+            log.info("You have no providers installed.")
+            return
+        try:
+            for path in airflow.providers.__path__:
+                self._add_provider_info_from_local_source_files_on_path(path)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading 'provider.yaml' files from airflow 
sources: %s", e)
+
+    def _add_provider_info_from_local_source_files_on_path(self, path) -> None:
+        """
+        Finds all the provider.yaml files in the directory specified.
+
+        :param path: path where to look for provider.yaml files
+        """
+        root_path = path
+        for folder, subdirs, files in os.walk(path, topdown=True):
+            for filename in fnmatch.filter(files, "provider.yaml"):
+                package_name = "apache-airflow-providers" + 
folder[len(root_path) :].replace(os.sep, "-")
+                
self._add_provider_info_from_local_source_file(os.path.join(folder, filename), 
package_name)
+                subdirs[:] = []

Review comment:
       ```suggestion
                   # Tell `os.walk` to not recurse any further.
                   subdirs[:] = []
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to