ashb commented on a change in pull request #12512:
URL: https://github.com/apache/airflow/pull/12512#discussion_r527943164



##########
File path: airflow/providers_manager.py
##########
@@ -46,42 +48,72 @@ class ProvidersManager:
     """Manages all provider packages."""
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_directory: Dict[str, Tuple[str, Dict]] = {}
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
-
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        self.__find_all_providers_from_packages()
+        self.__find_all_airflow_builtin_providers_from_local_sources()
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+    def __find_all_providers_from_packages(self):
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.ContextualVersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            self._provider_directory[package_name] = (version, provider_info)
+
+    def __find_all_airflow_builtin_providers_from_local_sources(self):
+        import airflow.providers
+
+        try:
+            for path in airflow.providers.__path__:
+                self.__add_provider_info_from_local_source_files_on_path(path)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading 'provider.yaml' files from airflow 
sources: %s", e)
+
+    def __add_provider_info_from_local_source_files_on_path(self, path):
+        root_path = path
+        for folder, _, files in os.walk(path):
+            for filename in fnmatch.filter(files, "provider.yaml"):
+                self.__add_provider_info_from_local_source_file(filename, 
folder, root_path)
+
+    def __add_provider_info_from_local_source_file(self, filename, folder, 
root_path):
+        try:
+            with open(os.path.join(folder, filename)) as provider_yaml_file:
+                provider_info = yaml.safe_load(provider_yaml_file.read())
+            self._validator.validate(provider_info)
+            package_name = "apache-airflow-providers" + folder[len(root_path) 
:].replace(os.sep, "-")
+            version = provider_info['versions'][0]
+            self._provider_directory[package_name] = (version, provider_info)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading '%s/%s': %s", folder, filename, e)
 
     @property
     def providers(self):
         """Returns information about available providers."""
         return self._provider_directory
+
+
+# After we move to python 3.9 + we can get rid of this and use @cache from the 
functools

Review comment:
       Oh nice!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to