potiuk commented on a change in pull request #12466:
URL: https://github.com/apache/airflow/pull/12466#discussion_r531547584
##########
File path: airflow/providers_manager.py
##########
@@ -36,52 +39,231 @@
def _create_validator():
+ """Creates JSON schema validator from the provider.yaml.schema.json"""
schema = json.loads(importlib_resources.read_text('airflow',
'provider.yaml.schema.json'))
cls = jsonschema.validators.validator_for(schema)
validator = cls(schema)
return validator
class ProvidersManager:
- """Manages all provider packages."""
+ """
+ Manages all provider packages. This is a Singleton class. The first time
it is
+ instantiated, it discovers all available providers in installed packages
and
+ local source folders (if airflow is run from sources).
+ """
+
+ _instance = None
+ resource_version = "0"
+
+ def __new__(cls):
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ return cls._instance
def __init__(self):
- self._provider_directory = {}
- try:
- from airflow import providers
- except ImportError as e:
- log.warning("No providers are present or error when importing
them! :%s", e)
- return
+ # Keeps dict of providers keyed by module name and value is Tuple:
version, provider_info
+ self._provider_dict: Dict[str, Tuple[str, Dict]] = {}
+ # Keeps dict of hooks keyed by connection type and value is
+ # Tuple: connection class, connection_id_attribute_name
+ self._hooks_dict: Dict[str, Tuple[str, str]] = {}
self._validator = _create_validator()
- self.__find_all_providers(providers.__path__)
+ # Local source folders are loaded first. They should take precedence
over the package ones for
+ # Development purpose. In production provider.yaml files are not
present in the 'airflow" directory
+ # So there is no risk we are going to override package provider
accidentally. This can only happen
+ # in case of local development
+ self._discover_all_airflow_builtin_providers_from_local_sources()
+ self._discover_all_providers_from_packages()
+ self._discover_hooks()
+ self._sort_provider_dictionary()
+ self._sort_hooks_dictionary()
- def __find_all_providers(self, paths: str):
- def onerror(_):
- exception_string = traceback.format_exc()
- log.warning(exception_string)
+ def _sort_hooks_dictionary(self):
+ """
+ Creates provider_directory as sorted (by package_name) OrderedDict.
- for module_info in pkgutil.walk_packages(paths,
prefix="airflow.providers.", onerror=onerror):
+ Duplicates are removed from "package" providers in case corresponding
"folder" provider is found.
+ The "folder" providers are from local sources (packages do not contain
provider.yaml files),
+ so if someone has airflow installed from local sources, the providers
are imported from there
+ first so, provider information should be taken from there.
+ :return:
+ """
+ sorted_dict = OrderedDict()
+ for connection_type in sorted(self._hooks_dict.keys()):
+ sorted_dict[connection_type] = self._hooks_dict[connection_type]
+ self._hooks_dict = sorted_dict
Review comment:
Yep.
##########
File path: airflow/providers_manager.py
##########
@@ -36,52 +39,231 @@
def _create_validator():
+ """Creates JSON schema validator from the provider.yaml.schema.json"""
schema = json.loads(importlib_resources.read_text('airflow',
'provider.yaml.schema.json'))
cls = jsonschema.validators.validator_for(schema)
validator = cls(schema)
return validator
class ProvidersManager:
- """Manages all provider packages."""
+ """
+ Manages all provider packages. This is a Singleton class. The first time
it is
+ instantiated, it discovers all available providers in installed packages
and
+ local source folders (if airflow is run from sources).
+ """
+
+ _instance = None
+ resource_version = "0"
+
+ def __new__(cls):
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ return cls._instance
def __init__(self):
- self._provider_directory = {}
- try:
- from airflow import providers
- except ImportError as e:
- log.warning("No providers are present or error when importing
them! :%s", e)
- return
+ # Keeps dict of providers keyed by module name and value is Tuple:
version, provider_info
+ self._provider_dict: Dict[str, Tuple[str, Dict]] = {}
+ # Keeps dict of hooks keyed by connection type and value is
+ # Tuple: connection class, connection_id_attribute_name
+ self._hooks_dict: Dict[str, Tuple[str, str]] = {}
self._validator = _create_validator()
- self.__find_all_providers(providers.__path__)
+ # Local source folders are loaded first. They should take precedence
over the package ones for
+ # Development purpose. In production provider.yaml files are not
present in the 'airflow" directory
+ # So there is no risk we are going to override package provider
accidentally. This can only happen
+ # in case of local development
+ self._discover_all_airflow_builtin_providers_from_local_sources()
+ self._discover_all_providers_from_packages()
+ self._discover_hooks()
+ self._sort_provider_dictionary()
+ self._sort_hooks_dictionary()
- def __find_all_providers(self, paths: str):
- def onerror(_):
- exception_string = traceback.format_exc()
- log.warning(exception_string)
+ def _sort_hooks_dictionary(self):
+ """
+ Creates provider_directory as sorted (by package_name) OrderedDict.
- for module_info in pkgutil.walk_packages(paths,
prefix="airflow.providers.", onerror=onerror):
+ Duplicates are removed from "package" providers in case corresponding
"folder" provider is found.
+ The "folder" providers are from local sources (packages do not contain
provider.yaml files),
+ so if someone has airflow installed from local sources, the providers
are imported from there
+ first so, provider information should be taken from there.
+ :return:
+ """
+ sorted_dict = OrderedDict()
+ for connection_type in sorted(self._hooks_dict.keys()):
+ sorted_dict[connection_type] = self._hooks_dict[connection_type]
+ self._hooks_dict = sorted_dict
+
+ def _sort_provider_dictionary(self):
+ """
+ Sort provider_dictionary using OrderedDict.
+
+ The dictionary gets sorted so that when you iterate through it, the
providers are by
+ default returned in alphabetical order.
+ """
+ sorted_dict = OrderedDict()
+ for provider_name in sorted(self._provider_dict.keys()):
+ sorted_dict[provider_name] = self._provider_dict[provider_name]
+ self._provider_dict = sorted_dict
Review comment:
yep
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]