This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-8-test by this push:
new 34791d72d6 Fix compatibility of updated `airflow.io` with released
providers (#36186)
34791d72d6 is described below
commit 34791d72d66be12e4b4c4c4ad10219402f752b1c
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Dec 12 17:47:11 2023 +0100
Fix compatibility of updated `airflow.io` with released providers (#36186)
The released providers added support to previous version of the
`airflow.io` - where options were not passed to `get_fs` method
that provides Fsspec compatible FileSystem. However #35820 added
positional "options" parameter when the method is called and it
broke already released providers.
This PR dynamically inspects signature of the get_fs method
and when one parameter is detected, it will skip passing options
to get_fs method call.
(cherry picked from commit 7799c51e15d3c418874fc059d8989f296b7f1a63)
---
airflow/io/__init__.py | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git a/airflow/io/__init__.py b/airflow/io/__init__.py
index b26d169090..b2658b0ee4 100644
--- a/airflow/io/__init__.py
+++ b/airflow/io/__init__.py
@@ -16,10 +16,12 @@
# under the License.
from __future__ import annotations
+import inspect
import logging
from typing import (
TYPE_CHECKING,
Callable,
+ Mapping,
)
from fsspec.implementations.local import LocalFileSystem
@@ -49,7 +51,12 @@ _BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None,
Properties], AbstractFile
@cache
-def _register_filesystems() -> dict[str, Callable[[str | None, Properties],
AbstractFileSystem]]:
+def _register_filesystems() -> (
+ Mapping[
+ str,
+ Callable[[str | None, Properties], AbstractFileSystem] | Callable[[str
| None], AbstractFileSystem],
+ ]
+):
scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
with Stats.timer("airflow.io.load_filesystems") as timer:
manager = ProvidersManager()
@@ -86,7 +93,13 @@ def get_fs(
raise ValueError(f"No filesystem registered for scheme {scheme}") from
None
options = storage_options or {}
- return fs(conn_id, options)
+ # MyPy does not recognize dynamic parameters inspection when we call the
method, and we have to do
+ # it for compatibility reasons with already released providers, that's why
we need to ignore
+ # mypy errors here
+ parameters = inspect.signature(fs).parameters
+ if len(parameters) == 1:
+ return fs(conn_id) # type: ignore[call-arg]
+ return fs(conn_id, options) # type: ignore[call-arg]
def has_fs(scheme: str) -> bool: