ashb commented on a change in pull request #12512:
URL: https://github.com/apache/airflow/pull/12512#discussion_r528656368



##########
File path: airflow/providers_manager.py
##########
@@ -46,42 +50,136 @@ class ProvidersManager:
     """Manages all provider packages."""
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_directory: Dict[str, Tuple[str, Dict]] = OrderedDict()
+        self._provider_list: [Tuple[str, str, Dict]] = []
+        self._provider_set: Set[str] = set()
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
+        # Local source folders are loaded first. They should take precedence 
over the package ones for
+        # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
+        # So there is no risk we are going to override package provider 
accidentally. This can only happen
+        # in case of local development
+        self.__find_all_airflow_builtin_providers_from_local_sources()
+        self.__find_all_providers_from_packages()
+        self.__creates_provider_directory()
+
+    def __creates_provider_directory(self):
+        """
+        Creates provider_directory as sorted (by package_name) OrderedDict.
+
+        Duplicates are removed from "package" providers in case corresponding 
"folder" provider is found.
+        The "folder" providers are from local sources (packages do not contain 
provider.yaml files),
+        so if someone has airflow installed from local sources, the providers 
are imported from there
+        first so, provider information should be taken from there.
+        :return:
+        """
+        self._provider_list.sort(key=lambda provider_list_element: 
provider_list_element[0])
+        for package_name, version, provider_info in self._provider_list:
+            self._provider_directory[package_name] = (version, provider_info)
+
+    def __find_all_providers_from_packages(self) -> None:
+        """
+        Finds all providers by scanning packages installed. The list of 
providers should be returned
+        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
+        'airflow/provider.yaml.schema.json' schema.
 
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        The providers are updated in the list of found providers.
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+        """
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.VersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))
+            else:
+                log.warning(
+                    "The providers for package '%s' could not be registered 
because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+
+    def __find_all_airflow_builtin_providers_from_local_sources(self) -> None:
+        """
+        Finds all built-in airflow providers if airflow is run from the local 
sources.
+        It finds `provider.yaml` files for all such providers and registers 
the providers using those.
+
+        This 'provider.yaml' scanning takes precedence over scanning packages 
installed because

Review comment:
       ```suggestion
           This 'provider.yaml' scanning takes precedence over scanning 
packages installed
   ```
   
   "because in case" -> "in case"

##########
File path: airflow/providers_manager.py
##########
@@ -46,42 +50,136 @@ class ProvidersManager:
     """Manages all provider packages."""
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_directory: Dict[str, Tuple[str, Dict]] = OrderedDict()
+        self._provider_list: [Tuple[str, str, Dict]] = []
+        self._provider_set: Set[str] = set()
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
+        # Local source folders are loaded first. They should take precedence 
over the package ones for
+        # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
+        # So there is no risk we are going to override package provider 
accidentally. This can only happen
+        # in case of local development
+        self.__find_all_airflow_builtin_providers_from_local_sources()
+        self.__find_all_providers_from_packages()
+        self.__creates_provider_directory()
+
+    def __creates_provider_directory(self):
+        """
+        Creates provider_directory as sorted (by package_name) OrderedDict.
+
+        Duplicates are removed from "package" providers in case corresponding 
"folder" provider is found.
+        The "folder" providers are from local sources (packages do not contain 
provider.yaml files),
+        so if someone has airflow installed from local sources, the providers 
are imported from there
+        first so, provider information should be taken from there.
+        :return:
+        """
+        self._provider_list.sort(key=lambda provider_list_element: 
provider_list_element[0])
+        for package_name, version, provider_info in self._provider_list:
+            self._provider_directory[package_name] = (version, provider_info)
+
+    def __find_all_providers_from_packages(self) -> None:
+        """
+        Finds all providers by scanning packages installed. The list of 
providers should be returned
+        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
+        'airflow/provider.yaml.schema.json' schema.
 
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        The providers are updated in the list of found providers.
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+        """
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.VersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))
+            else:
+                log.warning(
+                    "The providers for package '%s' could not be registered 
because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+
+    def __find_all_airflow_builtin_providers_from_local_sources(self) -> None:
+        """
+        Finds all built-in airflow providers if airflow is run from the local 
sources.
+        It finds `provider.yaml` files for all such providers and registers 
the providers using those.
+
+        This 'provider.yaml' scanning takes precedence over scanning packages 
installed because
+        in case you have both sources and packages installed, the providers 
will be loaded from
+        the "airflow" sources rather than from the packages.
+        """
+        import airflow.providers
+
+        try:
+            for path in airflow.providers.__path__:
+                self.__add_provider_info_from_local_source_files_on_path(path)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading 'provider.yaml' files from airflow 
sources: %s", e)
+
+    def __add_provider_info_from_local_source_files_on_path(self, path) -> 
None:
+        """
+        Finds all the provider.yaml files in the directory specified.
+        :param path: path where to look for provider.yaml files
+        """
+        root_path = path
+        for folder, _, files in os.walk(path):
+            for filename in fnmatch.filter(files, "provider.yaml"):
+                self.__add_provider_info_from_local_source_file(filename, 
folder, root_path)
+
+    def __add_provider_info_from_local_source_file(self, filename, folder, 
root_path) -> None:
+        """
+        Parses found provider.yaml file and adds found provider to the list of 
all providers.
+        :param filename: name of the provider.yaml file
+        :param folder: folder where the provider.yaml file is
+        :param root_path: root path where we started to search 
("airflow/providers" folder from the
+                          local sources
+        """
+        try:
+            with open(os.path.join(folder, filename)) as provider_yaml_file:
+                provider_info = yaml.safe_load(provider_yaml_file.read())

Review comment:
       Can pass the stream directly to pyyaml:
   
   ```suggestion
                   provider_info = yaml.safe_load(provider_yaml_file)
   ```

##########
File path: airflow/providers_manager.py
##########
@@ -46,42 +50,136 @@ class ProvidersManager:
     """Manages all provider packages."""
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_directory: Dict[str, Tuple[str, Dict]] = OrderedDict()
+        self._provider_list: [Tuple[str, str, Dict]] = []
+        self._provider_set: Set[str] = set()
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
+        # Local source folders are loaded first. They should take precedence 
over the package ones for
+        # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
+        # So there is no risk we are going to override package provider 
accidentally. This can only happen
+        # in case of local development
+        self.__find_all_airflow_builtin_providers_from_local_sources()
+        self.__find_all_providers_from_packages()
+        self.__creates_provider_directory()
+
+    def __creates_provider_directory(self):
+        """
+        Creates provider_directory as sorted (by package_name) OrderedDict.
+
+        Duplicates are removed from "package" providers in case corresponding 
"folder" provider is found.
+        The "folder" providers are from local sources (packages do not contain 
provider.yaml files),
+        so if someone has airflow installed from local sources, the providers 
are imported from there
+        first so, provider information should be taken from there.
+        :return:
+        """
+        self._provider_list.sort(key=lambda provider_list_element: 
provider_list_element[0])
+        for package_name, version, provider_info in self._provider_list:
+            self._provider_directory[package_name] = (version, provider_info)
+
+    def __find_all_providers_from_packages(self) -> None:
+        """
+        Finds all providers by scanning packages installed. The list of 
providers should be returned
+        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
+        'airflow/provider.yaml.schema.json' schema.
 
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        The providers are updated in the list of found providers.
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+        """
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.VersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))
+            else:
+                log.warning(
+                    "The providers for package '%s' could not be registered 
because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+
+    def __find_all_airflow_builtin_providers_from_local_sources(self) -> None:
+        """
+        Finds all built-in airflow providers if airflow is run from the local 
sources.
+        It finds `provider.yaml` files for all such providers and registers 
the providers using those.
+
+        This 'provider.yaml' scanning takes precedence over scanning packages 
installed because
+        in case you have both sources and packages installed, the providers 
will be loaded from
+        the "airflow" sources rather than from the packages.
+        """
+        import airflow.providers
+
+        try:
+            for path in airflow.providers.__path__:
+                self.__add_provider_info_from_local_source_files_on_path(path)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading 'provider.yaml' files from airflow 
sources: %s", e)
+
+    def __add_provider_info_from_local_source_files_on_path(self, path) -> 
None:
+        """
+        Finds all the provider.yaml files in the directory specified.
+        :param path: path where to look for provider.yaml files
+        """
+        root_path = path
+        for folder, _, files in os.walk(path):
+            for filename in fnmatch.filter(files, "provider.yaml"):
+                self.__add_provider_info_from_local_source_file(filename, 
folder, root_path)

Review comment:
       If we find a match we can stop searching that tree -- we have to edit 
the `dirnames` (2nd arg) in place using `del` or a slice assignment.
   
   ```suggestion
           for folder, subfolders, files in os.walk(path):
               for filename in fnmatch.filter(files, "provider.yaml"):
                   self.__add_provider_info_from_local_source_file(filename, 
folder, root_path)
                   subfolders[:] = []
   ```

##########
File path: airflow/providers_manager.py
##########
@@ -46,42 +50,136 @@ class ProvidersManager:
     """Manages all provider packages."""
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_directory: Dict[str, Tuple[str, Dict]] = OrderedDict()
+        self._provider_list: [Tuple[str, str, Dict]] = []
+        self._provider_set: Set[str] = set()
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
+        # Local source folders are loaded first. They should take precedence 
over the package ones for
+        # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
+        # So there is no risk we are going to override package provider 
accidentally. This can only happen
+        # in case of local development
+        self.__find_all_airflow_builtin_providers_from_local_sources()
+        self.__find_all_providers_from_packages()
+        self.__creates_provider_directory()
+
+    def __creates_provider_directory(self):
+        """
+        Creates provider_directory as sorted (by package_name) OrderedDict.
+
+        Duplicates are removed from "package" providers in case corresponding 
"folder" provider is found.
+        The "folder" providers are from local sources (packages do not contain 
provider.yaml files),
+        so if someone has airflow installed from local sources, the providers 
are imported from there
+        first so, provider information should be taken from there.
+        :return:
+        """
+        self._provider_list.sort(key=lambda provider_list_element: 
provider_list_element[0])
+        for package_name, version, provider_info in self._provider_list:
+            self._provider_directory[package_name] = (version, provider_info)
+
+    def __find_all_providers_from_packages(self) -> None:
+        """
+        Finds all providers by scanning packages installed. The list of 
providers should be returned
+        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
+        'airflow/provider.yaml.schema.json' schema.
 
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        The providers are updated in the list of found providers.
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+        """
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.VersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))
+            else:
+                log.warning(
+                    "The providers for package '%s' could not be registered 
because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+
+    def __find_all_airflow_builtin_providers_from_local_sources(self) -> None:
+        """
+        Finds all built-in airflow providers if airflow is run from the local 
sources.
+        It finds `provider.yaml` files for all such providers and registers 
the providers using those.
+
+        This 'provider.yaml' scanning takes precedence over scanning packages 
installed because
+        in case you have both sources and packages installed, the providers 
will be loaded from
+        the "airflow" sources rather than from the packages.
+        """
+        import airflow.providers
+
+        try:
+            for path in airflow.providers.__path__:
+                self.__add_provider_info_from_local_source_files_on_path(path)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading 'provider.yaml' files from airflow 
sources: %s", e)
+
+    def __add_provider_info_from_local_source_files_on_path(self, path) -> 
None:
+        """
+        Finds all the provider.yaml files in the directory specified.
+        :param path: path where to look for provider.yaml files
+        """
+        root_path = path
+        for folder, _, files in os.walk(path):
+            for filename in fnmatch.filter(files, "provider.yaml"):
+                self.__add_provider_info_from_local_source_file(filename, 
folder, root_path)
+
+    def __add_provider_info_from_local_source_file(self, filename, folder, 
root_path) -> None:

Review comment:
       A signature change please:
   
   ```suggestion
       def __add_provider_info_from_local_source_file(self, filename, 
provider_dist_name) -> None: 
   ```
   Where `filename` is the full path to the yaml file to load, and 
provider_dist_name is the name to register it as.
   (dist_name not package_name, as `-` is not valid in packages).
   
   For example, called like this:
   
   ```python
           for folder, _, files in os.walk(path):
               for filename in fnmatch.filter(files, "provider.yaml"):
                   dist_name = "apache-airflow-providers" + 
folder[len(root_path) + 1 :].replace(os.sep, "-")
                   
self.__add_provider_info_from_local_source_file(os.path.join(folder, filename), 
dist_name) 
   ```
   
   I think this change will make it easier to re-use this fn for plugins.

##########
File path: airflow/providers_manager.py
##########
@@ -46,42 +50,136 @@ class ProvidersManager:
     """Manages all provider packages."""
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_directory: Dict[str, Tuple[str, Dict]] = OrderedDict()
+        self._provider_list: [Tuple[str, str, Dict]] = []
+        self._provider_set: Set[str] = set()
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
+        # Local source folders are loaded first. They should take precedence 
over the package ones for
+        # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
+        # So there is no risk we are going to override package provider 
accidentally. This can only happen
+        # in case of local development
+        self.__find_all_airflow_builtin_providers_from_local_sources()
+        self.__find_all_providers_from_packages()
+        self.__creates_provider_directory()
+
+    def __creates_provider_directory(self):
+        """
+        Creates provider_directory as sorted (by package_name) OrderedDict.
+
+        Duplicates are removed from "package" providers in case corresponding 
"folder" provider is found.
+        The "folder" providers are from local sources (packages do not contain 
provider.yaml files),
+        so if someone has airflow installed from local sources, the providers 
are imported from there
+        first so, provider information should be taken from there.
+        :return:
+        """
+        self._provider_list.sort(key=lambda provider_list_element: 
provider_list_element[0])
+        for package_name, version, provider_info in self._provider_list:
+            self._provider_directory[package_name] = (version, provider_info)
+
+    def __find_all_providers_from_packages(self) -> None:
+        """
+        Finds all providers by scanning packages installed. The list of 
providers should be returned
+        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
+        'airflow/provider.yaml.schema.json' schema.
 
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        The providers are updated in the list of found providers.
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+        """
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.VersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))
+            else:
+                log.warning(
+                    "The providers for package '%s' could not be registered 
because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+
+    def __find_all_airflow_builtin_providers_from_local_sources(self) -> None:
+        """
+        Finds all built-in airflow providers if airflow is run from the local 
sources.
+        It finds `provider.yaml` files for all such providers and registers 
the providers using those.
+
+        This 'provider.yaml' scanning takes precedence over scanning packages 
installed because
+        in case you have both sources and packages installed, the providers 
will be loaded from
+        the "airflow" sources rather than from the packages.
+        """
+        import airflow.providers
+
+        try:
+            for path in airflow.providers.__path__:
+                self.__add_provider_info_from_local_source_files_on_path(path)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading 'provider.yaml' files from airflow 
sources: %s", e)
+
+    def __add_provider_info_from_local_source_files_on_path(self, path) -> 
None:
+        """
+        Finds all the provider.yaml files in the directory specified.
+        :param path: path where to look for provider.yaml files
+        """
+        root_path = path
+        for folder, _, files in os.walk(path):
+            for filename in fnmatch.filter(files, "provider.yaml"):
+                self.__add_provider_info_from_local_source_file(filename, 
folder, root_path)
+
+    def __add_provider_info_from_local_source_file(self, filename, folder, 
root_path) -> None:
+        """
+        Parses found provider.yaml file and adds found provider to the list of 
all providers.
+        :param filename: name of the provider.yaml file
+        :param folder: folder where the provider.yaml file is
+        :param root_path: root path where we started to search 
("airflow/providers" folder from the
+                          local sources
+        """
+        try:
+            with open(os.path.join(folder, filename)) as provider_yaml_file:
+                provider_info = yaml.safe_load(provider_yaml_file.read())
+            self._validator.validate(provider_info)
+            package_name = "apache-airflow-providers" + folder[len(root_path) 
:].replace(os.sep, "-")
+            version = provider_info['versions'][0]
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))

Review comment:
       How about a single dict, rather than a set and a list?

##########
File path: dev/provider_packages/get_provider_info_TEMPLATE.py.jinja2
##########
@@ -0,0 +1,13 @@
+import yaml
+
+try:
+    import importlib.resources as importlib_resources
+except ImportError:
+    # Try backported to PY<37 `importlib_resources`.
+    import importlib_resources
+
+
+def get_provider_info():
+    return yaml.safe_load("""
+{{ PROVIDER_INFO }}
+""")

Review comment:
       Do need to YAML load this - could we not return a python dict directly?

##########
File path: airflow/providers_manager.py
##########
@@ -46,42 +50,136 @@ class ProvidersManager:
     """Manages all provider packages."""
 
     def __init__(self):
-        self._provider_directory = {}
-        try:
-            from airflow import providers
-        except ImportError as e:
-            log.warning("No providers are present or error when importing 
them! :%s", e)
-            return
+        # Keeps list of providers keyed by module name and value is Tuple: 
version, provider_info
+        self._provider_directory: Dict[str, Tuple[str, Dict]] = OrderedDict()
+        self._provider_list: [Tuple[str, str, Dict]] = []
+        self._provider_set: Set[str] = set()
         self._validator = _create_validator()
-        self.__find_all_providers(providers.__path__)
+        # Local source folders are loaded first. They should take precedence 
over the package ones for
+        # Development purpose. In production provider.yaml files are not 
present in the 'airflow" directory
+        # So there is no risk we are going to override package provider 
accidentally. This can only happen
+        # in case of local development
+        self.__find_all_airflow_builtin_providers_from_local_sources()
+        self.__find_all_providers_from_packages()
+        self.__creates_provider_directory()
+
+    def __creates_provider_directory(self):
+        """
+        Creates provider_directory as sorted (by package_name) OrderedDict.
+
+        Duplicates are removed from "package" providers in case corresponding 
"folder" provider is found.
+        The "folder" providers are from local sources (packages do not contain 
provider.yaml files),
+        so if someone has airflow installed from local sources, the providers 
are imported from there
+        first so, provider information should be taken from there.
+        :return:
+        """
+        self._provider_list.sort(key=lambda provider_list_element: 
provider_list_element[0])
+        for package_name, version, provider_info in self._provider_list:
+            self._provider_directory[package_name] = (version, provider_info)
+
+    def __find_all_providers_from_packages(self) -> None:
+        """
+        Finds all providers by scanning packages installed. The list of 
providers should be returned
+        via the 'apache_airflow_provider' entrypoint as a dictionary 
conforming to the
+        'airflow/provider.yaml.schema.json' schema.
 
-    def __find_all_providers(self, paths: str):
-        def onerror(_):
-            exception_string = traceback.format_exc()
-            log.warning(exception_string)
+        The providers are updated in the list of found providers.
 
-        for module_info in pkgutil.walk_packages(paths, 
prefix="airflow.providers.", onerror=onerror):
+        """
+        for entry_point in 
pkg_resources.iter_entry_points('apache_airflow_provider'):
+            package_name = entry_point.dist.project_name
+            version = entry_point.dist.version
             try:
-                imported_module = importlib.import_module(module_info.name)
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when importing %s:%s", module_info.name, e)
+                provider_info = entry_point.load()()
+            except pkg_resources.VersionConflict as e:
+                log.warning(
+                    "The provider package %s could not be registered because 
of version conflict : %s",
+                    package_name,
+                    e,
+                )
                 continue
-            try:
-                provider = importlib_resources.read_text(imported_module, 
'provider.yaml')
-                provider_info = yaml.safe_load(provider)
-                self._validator.validate(provider_info)
-                self._provider_directory[provider_info['package-name']] = 
provider_info
-            except FileNotFoundError:
-                # This is OK - this is not a provider package
-                pass
-            except TypeError as e:
-                if "is not a package" not in str(e):
-                    log.warning("Error when loading 'provider.yaml' file from 
%s:%s}", module_info.name, e)
-                # Otherwise this is OK - this is likely a module
-            except Exception as e:  # noqa pylint: disable=broad-except
-                log.warning("Error when loading 'provider.yaml' file from 
%s:%s", module_info.name, e)
+            self._validator.validate(provider_info)
+            provider_info_package_name = provider_info['package-name']
+            if package_name != provider_info_package_name:
+                raise Exception(
+                    f"The package '{package_name}' from setuptools and "
+                    f"{provider_info_package_name} do not match. Please make 
sure they are"
+                    f"aligned"
+                )
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))
+            else:
+                log.warning(
+                    "The providers for package '%s' could not be registered 
because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+
+    def __find_all_airflow_builtin_providers_from_local_sources(self) -> None:
+        """
+        Finds all built-in airflow providers if airflow is run from the local 
sources.
+        It finds `provider.yaml` files for all such providers and registers 
the providers using those.
+
+        This 'provider.yaml' scanning takes precedence over scanning packages 
installed because
+        in case you have both sources and packages installed, the providers 
will be loaded from
+        the "airflow" sources rather than from the packages.
+        """
+        import airflow.providers
+
+        try:
+            for path in airflow.providers.__path__:
+                self.__add_provider_info_from_local_source_files_on_path(path)
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading 'provider.yaml' files from airflow 
sources: %s", e)
+
+    def __add_provider_info_from_local_source_files_on_path(self, path) -> 
None:
+        """
+        Finds all the provider.yaml files in the directory specified.
+        :param path: path where to look for provider.yaml files
+        """
+        root_path = path
+        for folder, _, files in os.walk(path):
+            for filename in fnmatch.filter(files, "provider.yaml"):
+                self.__add_provider_info_from_local_source_file(filename, 
folder, root_path)
+
+    def __add_provider_info_from_local_source_file(self, filename, folder, 
root_path) -> None:
+        """
+        Parses found provider.yaml file and adds found provider to the list of 
all providers.
+        :param filename: name of the provider.yaml file
+        :param folder: folder where the provider.yaml file is
+        :param root_path: root path where we started to search 
("airflow/providers" folder from the
+                          local sources
+        """
+        try:
+            with open(os.path.join(folder, filename)) as provider_yaml_file:
+                provider_info = yaml.safe_load(provider_yaml_file.read())
+            self._validator.validate(provider_info)
+            package_name = "apache-airflow-providers" + folder[len(root_path) 
:].replace(os.sep, "-")
+            version = provider_info['versions'][0]
+            if package_name not in self._provider_set:
+                self._provider_set.add(package_name)
+                self._provider_list.append((package_name, version, 
provider_info))
+            else:
+                log.warning(
+                    "The providers for package '%s' could not be registered 
because providers for that "
+                    "package name have already been registered",
+                    package_name,
+                )
+        except Exception as e:  # noqa pylint: disable=broad-except
+            log.warning("Error when loading '%s/%s': %s", folder, filename, e)
 
     @property
     def providers(self):
         """Returns information about available providers."""
         return self._provider_directory
+
+
+# After we move to python 3.9 + we can get rid of this and use @cache from the 
functools
+cache = lru_cache(maxsize=None)
+
+
+@cache
+def get_providers_manager() -> ProvidersManager:  # noqa
+    """Returns singleton instance of ProvidersManager"""
+    return ProvidersManager()

Review comment:
       We could do this more "formally" as Daniel did in 
https://github.com/apache/airflow/commit/3ca11eb9b02a2c2591292fd6b76e0e98b8f22656#diff-681de8974a439f70dfa41705f5c1681ecce615fac6c4c715c1978d28d8f0da84R63




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to