This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6c94ddf2bc Improve handling of backwards compat for airflow.io (#36199)
6c94ddf2bc is described below
commit 6c94ddf2bc123bfc7a59df4ce05f2b4e980f7a15
Author: Bolke de Bruin <[email protected]>
AuthorDate: Wed Dec 13 18:56:30 2023 +0100
Improve handling of backwards compat for airflow.io (#36199)
Older providers do not have a get_fs method that takes
storage_options as arguments. If we encounter such provider
and storage_options are passed we should error out instead
if silently ignoring.
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/io/__init__.py | 7 +++++++
tests/io/test_path.py | 21 +++++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/airflow/io/__init__.py b/airflow/io/__init__.py
index b2658b0ee4..9996a77717 100644
--- a/airflow/io/__init__.py
+++ b/airflow/io/__init__.py
@@ -93,11 +93,18 @@ def get_fs(
raise ValueError(f"No filesystem registered for scheme {scheme}") from
None
options = storage_options or {}
+
# MyPy does not recognize dynamic parameters inspection when we call the
method, and we have to do
# it for compatibility reasons with already released providers, that's why
we need to ignore
# mypy errors here
parameters = inspect.signature(fs).parameters
if len(parameters) == 1:
+ if options:
+ raise AttributeError(
+ f"Filesystem {scheme} does not support storage options, but
options were passed."
+ f"This most likely means that you are using an old version of
the provider that does not "
+ f"support storage options. Please upgrade the provider if
possible."
+ )
return fs(conn_id) # type: ignore[call-arg]
return fs(conn_id, options) # type: ignore[call-arg]
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index 58478a0285..7c97c7b2b9 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -26,6 +26,7 @@ import pytest
from fsspec.implementations.local import LocalFileSystem
from fsspec.utils import stringify_path
+from airflow.io import _register_filesystems, get_fs
from airflow.io.path import ObjectStoragePath
from airflow.io.store import _STORE_CACHE, ObjectStore, attach
from airflow.utils.module_loading import qualname
@@ -51,6 +52,10 @@ class FakeRemoteFileSystem(LocalFileSystem):
return path[i + 3 :] if i > 0 else path
+def get_fs_no_storage_options(_: str):
+ return LocalFileSystem()
+
+
class TestFs:
def setup_class(self):
self._store_cache = _STORE_CACHE.copy()
@@ -285,3 +290,19 @@ class TestFs:
assert s["conn_id"] is None
assert s["filesystem"] == qualname(LocalFileSystem)
assert store == d
+
+ def test_backwards_compat(self):
+ _register_filesystems.cache_clear()
+ from airflow.io import _BUILTIN_SCHEME_TO_FS as SCHEMES
+
+ try:
+ SCHEMES["file"] = get_fs_no_storage_options # type:
ignore[call-arg]
+
+ assert get_fs("file")
+
+ with pytest.raises(AttributeError):
+ get_fs("file", storage_options={"foo": "bar"})
+
+ finally:
+ # Reset the cache to avoid side effects
+ _register_filesystems.cache_clear()