This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8765fc990a7 Remove unused discovery using sources from Providers
Manager (#46840)
8765fc990a7 is described below
commit 8765fc990a74aebd3aebefd25dc25e6ec99680f1
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Feb 20 10:21:09 2025 +0100
Remove unused discovery using sources from Providers Manager (#46840)
Since we moved all providers to separate distributions/packages
each of them is installed from sources using their own distribution
and they have entrypoints added even if they are installed in
editable mode. This allows us to remove the "source" discovery
mechanism that read provider_info information from yaml rather than
from get_provider_info entrypoint.
Small drawback of this change is that you need to run pre-commits
when you change provider.yaml (or manually modify get_info entrypoint
because only then it will be reflected in the entrypoint retrieval
---
airflow/providers_manager.py | 111 +--------------------
tests/always/test_providers_manager.py | 7 --
.../endpoints/test_provider_endpoint.py | 2 -
.../core_api/routes/public/test_providers.py | 2 -
4 files changed, 3 insertions(+), 119 deletions(-)
diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py
index 9b39439384f..3c19a5d4797 100644
--- a/airflow/providers_manager.py
+++ b/airflow/providers_manager.py
@@ -19,12 +19,10 @@
from __future__ import annotations
-import fnmatch
import functools
import inspect
import json
import logging
-import os
import traceback
import warnings
from collections.abc import MutableMapping
@@ -40,7 +38,6 @@ from airflow.exceptions import
AirflowOptionalProviderFeatureException
from airflow.providers.standard.hooks.filesystem import FSHook
from airflow.providers.standard.hooks.package_index import PackageIndexHook
from airflow.typing_compat import ParamSpec
-from airflow.utils import yaml
from airflow.utils.entry_points import entry_points_with_dist
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
@@ -86,7 +83,6 @@ def _ensure_prefix_for_placeholders(field_behaviors:
dict[str, Any], conn_type:
if TYPE_CHECKING:
- from typing import Literal
from urllib.parse import SplitResult
from airflow.decorators.base import TaskDecorator
@@ -198,21 +194,10 @@ class ProviderInfo:
:param version: version string
:param data: dictionary with information about the provider
- :param source_or_package: whether the provider is source files or PyPI
package. When installed from
- sources we suppress provider import errors.
"""
version: str
data: dict
- package_or_source: Literal["source", "package"]
-
- def __post_init__(self):
- if self.package_or_source not in ("source", "package"):
- raise ValueError(
- f"Received {self.package_or_source!r} for `package_or_source`.
"
- "Must be either 'package' or 'source'."
- )
- self.is_source = self.package_or_source == "source"
class HookClassProvider(NamedTuple):
@@ -275,16 +260,6 @@ class ConnectionFormWidgetInfo(NamedTuple):
is_sensitive: bool
-def log_debug_import_from_sources(class_name, e, provider_package):
- """Log debug imports from sources."""
- log.debug(
- "Optional feature disabled on exception when importing '%s' from '%s'
package",
- class_name,
- provider_package,
- exc_info=e,
- )
-
-
def log_optional_feature_disabled(class_name, e, provider_package):
"""Log optional feature disabled."""
log.debug(
@@ -342,12 +317,6 @@ def _correctness_check(provider_package: str, class_name:
str, provider_info: Pr
log_optional_feature_disabled(class_name, e, provider_package)
return None
except ImportError as e:
- if provider_info.is_source:
- # When we have providers from sources, then we just turn all
import logs to debug logs
- # As this is pretty expected that you have a number of
dependencies not installed
- # (we always have all providers from sources until we split
providers to separate repo)
- log_debug_import_from_sources(class_name, e, provider_package)
- return None
if "No module named 'airflow.providers." in e.msg:
# handle cases where another provider is missing. This can only
happen if
# there is an optional feature, so we log debug and print
information about it
@@ -408,8 +377,7 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
Manages all provider packages.
This is a Singleton class. The first time it is
- instantiated, it discovers all available providers in installed packages
and
- local source folders (if airflow is run from sources).
+ instantiated, it discovers all available providers in installed packages.
"""
resource_version = "0"
@@ -501,7 +469,6 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
# Development purpose. In production provider.yaml files are not
present in the 'airflow" directory
# So there is no risk we are going to override package provider
accidentally. This can only happen
# in case of local development
- self._discover_all_airflow_builtin_providers_from_local_sources()
self._discover_all_providers_from_packages()
self._verify_all_providers_all_compatible()
self._provider_dict = dict(sorted(self._provider_dict.items()))
@@ -640,11 +607,11 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
provider_info_package_name = provider_info["package-name"]
if package_name != provider_info_package_name:
raise ValueError(
- f"The package '{package_name}' from setuptools and "
+ f"The package '{package_name}' from packaging information "
f"{provider_info_package_name} do not match. Please make
sure they are aligned"
)
if package_name not in self._provider_dict:
- self._provider_dict[package_name] = ProviderInfo(version,
provider_info, "package")
+ self._provider_dict[package_name] = ProviderInfo(version,
provider_info)
else:
log.warning(
"The provider for package '%s' could not be registered
from because providers for that "
@@ -652,78 +619,6 @@ class ProvidersManager(LoggingMixin, metaclass=Singleton):
package_name,
)
- def _discover_all_airflow_builtin_providers_from_local_sources(self) ->
None:
- """
- Find all built-in airflow providers if airflow is run from the local
sources.
-
- It finds `provider.yaml` files for all such providers and registers
the providers using those.
-
- This 'provider.yaml' scanning takes precedence over scanning packages
installed
- in case you have both sources and packages installed, the providers
will be loaded from
- the "airflow" sources rather than from the packages.
- """
- try:
- import airflow.providers
- except ImportError:
- log.info("You have no providers installed.")
- return
-
- seen = set()
- for path in airflow.providers.__path__: # type: ignore[attr-defined]
- try:
- # The same path can appear in the __path__ twice, under
non-normalized paths (ie.
- # /path/to/repo/airflow/providers and
/path/to/repo/./airflow/providers)
- path = os.path.realpath(path)
- if path not in seen:
- seen.add(path)
-
self._add_provider_info_from_local_source_files_on_path(path)
- except Exception as e:
- log.warning("Error when loading 'provider.yaml' files from %s
airflow sources: %s", path, e)
- # TODO: AIP-66: Remove this when the package is moved to providers
-
self._add_provider_info_from_local_source_files_on_path("airflow/dag_processing")
-
- def _add_provider_info_from_local_source_files_on_path(self, path) -> None:
- """
- Find all the provider.yaml files in the directory specified.
-
- :param path: path where to look for provider.yaml files
- """
- root_path = path
- for folder, subdirs, files in os.walk(path, topdown=True):
- for filename in fnmatch.filter(files, "provider.yaml"):
- try:
- package_name = "apache-airflow-providers" +
folder[len(root_path) :].replace(os.sep, "-")
- self._add_provider_info_from_local_source_file(
- os.path.join(folder, filename), package_name
- )
- subdirs[:] = []
- except Exception as e:
- log.warning("Error when loading 'provider.yaml' file from
%s %e", folder, e)
-
- def _add_provider_info_from_local_source_file(self, path, package_name) ->
None:
- """
- Parse found provider.yaml file and adds found provider to the
dictionary.
-
- :param path: full file path of the provider.yaml file
- :param package_name: name of the package
- """
- try:
- log.debug("Loading %s from %s", package_name, path)
- with open(path) as provider_yaml_file:
- provider_info = yaml.safe_load(provider_yaml_file)
- self._provider_schema_validator.validate(provider_info)
- version = provider_info["versions"][0]
- if package_name not in self._provider_dict:
- self._provider_dict[package_name] = ProviderInfo(version,
provider_info, "source")
- else:
- log.warning(
- "The providers for package '%s' could not be registered
because providers for that "
- "package name have already been registered",
- package_name,
- )
- except Exception as e:
- log.warning("Error when loading '%s'", path, exc_info=e)
-
def _discover_hooks_from_connection_types(
self,
hook_class_names_registered: set[str],
diff --git a/tests/always/test_providers_manager.py
b/tests/always/test_providers_manager.py
index a808aedbb80..670ee1037f4 100644
--- a/tests/always/test_providers_manager.py
+++ b/tests/always/test_providers_manager.py
@@ -79,7 +79,6 @@ class TestProviderManager:
providers_manager._provider_dict["test-package"] = ProviderInfo(
version="0.0.1",
data={"hook-class-names":
["airflow.providers.sftp.hooks.sftp.SFTPHook"]},
- package_or_source="package",
)
providers_manager._discover_hooks()
assert warning_records
@@ -98,7 +97,6 @@ class TestProviderManager:
}
],
},
- package_or_source="package",
)
providers_manager._discover_hooks()
assert [w.message for w in warning_records if "hook-class-names" in
str(w.message)] == []
@@ -118,7 +116,6 @@ class TestProviderManager:
}
],
},
- package_or_source="package",
)
providers_manager._discover_hooks()
_ = providers_manager._hooks_lazy_dict["wrong-connection-type"]
@@ -140,7 +137,6 @@ class TestProviderManager:
}
],
},
- package_or_source="package",
)
providers_manager._discover_hooks()
_ = providers_manager._hooks_lazy_dict["sftp"]
@@ -164,7 +160,6 @@ class TestProviderManager:
},
],
},
- package_or_source="package",
)
providers_manager._discover_hooks()
_ = providers_manager._hooks_lazy_dict["dummy"]
@@ -188,7 +183,6 @@ class TestProviderManager:
}
]
},
- package_or_source="package",
)
providers_manager._discover_plugins()
assert len(providers_manager._plugins_set) == 1
@@ -211,7 +205,6 @@ class TestProviderManager:
}
]
},
- package_or_source="package",
)
providers_manager._discover_hooks()
assert len(providers_manager._dialect_provider_dict) == 1
diff --git a/tests/api_connexion/endpoints/test_provider_endpoint.py
b/tests/api_connexion/endpoints/test_provider_endpoint.py
index 077b3b382d0..b02dbb61e03 100644
--- a/tests/api_connexion/endpoints/test_provider_endpoint.py
+++ b/tests/api_connexion/endpoints/test_provider_endpoint.py
@@ -35,7 +35,6 @@ MOCK_PROVIDERS = {
"description": "`Amazon Web Services (AWS)
<https://aws.amazon.com/>`__.\n",
"versions": ["1.0.0"],
},
- "package",
),
"apache-airflow-providers-apache-cassandra": ProviderInfo(
"1.0.0",
@@ -45,7 +44,6 @@ MOCK_PROVIDERS = {
"description": "`Apache Cassandra
<http://cassandra.apache.org/>`__.\n",
"versions": ["1.0.0"],
},
- "package",
),
}
diff --git a/tests/api_fastapi/core_api/routes/public/test_providers.py
b/tests/api_fastapi/core_api/routes/public/test_providers.py
index 6310a1251ba..8ad36e2a5ef 100644
--- a/tests/api_fastapi/core_api/routes/public/test_providers.py
+++ b/tests/api_fastapi/core_api/routes/public/test_providers.py
@@ -33,7 +33,6 @@ MOCK_PROVIDERS = {
"description": "`Amazon Web Services (AWS)
<https://aws.amazon.com/>`__.\n",
"versions": ["1.0.0"],
},
- "package",
),
"apache-airflow-providers-apache-cassandra": ProviderInfo(
"1.0.0",
@@ -43,7 +42,6 @@ MOCK_PROVIDERS = {
"description": "`Apache Cassandra
<http://cassandra.apache.org/>`__.\n",
"versions": ["1.0.0"],
},
- "package",
),
}