This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5cf2560cce Some improvements to Airflow IO code (#36259)
5cf2560cce is described below
commit 5cf2560cce22a8db1f44c0bcca118b8d314f3cf7
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Jan 4 14:42:31 2024 +0100
Some improvements to Airflow IO code (#36259)
---
airflow/io/store/__init__.py | 30 ++++++++++++++++--------------
tests/io/test_path.py | 2 +-
2 files changed, 17 insertions(+), 15 deletions(-)
diff --git a/airflow/io/store/__init__.py b/airflow/io/store/__init__.py
index bc9e0cd87d..d1e897c5de 100644
--- a/airflow/io/store/__init__.py
+++ b/airflow/io/store/__init__.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+from contextlib import suppress
+from functools import cached_property
from typing import TYPE_CHECKING, ClassVar
from airflow.io import get_fs, has_fs
@@ -37,8 +39,6 @@ class ObjectStore:
protocol: str
storage_options: Properties | None
- _fs: AbstractFileSystem | None = None
-
def __init__(
self,
protocol: str,
@@ -48,15 +48,17 @@ class ObjectStore:
):
self.conn_id = conn_id
self.protocol = protocol
- self._fs = fs
+ if fs is not None:
+ self.fs = fs
self.storage_options = storage_options
def __str__(self):
return f"{self.protocol}-{self.conn_id}" if self.conn_id else
self.protocol
- @property
+ @cached_property
def fs(self) -> AbstractFileSystem:
- return self._connect()
+ # if the fs is provided in init, the next statement will be ignored
+ return get_fs(self.protocol, self.conn_id)
@property
def fsid(self) -> str:
@@ -68,9 +70,8 @@ class ObjectStore:
:return: deterministic the filesystem ID
"""
- fs = self._connect()
try:
- return fs.fsid
+ return self.fs.fsid
except NotImplementedError:
return f"{self.fs.protocol}-{self.conn_id or 'env'}"
@@ -78,7 +79,7 @@ class ObjectStore:
return {
"protocol": self.protocol,
"conn_id": self.conn_id,
- "filesystem": qualname(self._fs) if self._fs else None,
+ "filesystem": qualname(self.fs) if self.fs else None,
"storage_options": self.storage_options,
}
@@ -103,13 +104,14 @@ class ObjectStore:
return attach(protocol=protocol, conn_id=conn_id,
storage_options=data["storage_options"])
- def _connect(self) -> AbstractFileSystem:
- if self._fs is None:
- self._fs = get_fs(self.protocol, self.conn_id)
- return self._fs
-
def __eq__(self, other):
- return isinstance(other, type(self)) and other.conn_id == self.conn_id
and other._fs == self._fs
+ self_fs = None
+ other_fs = None
+ with suppress(ValueError):
+ self_fs = self.fs
+ with suppress(ValueError):
+ other_fs = other.fs
+ return isinstance(other, type(self)) and other.conn_id == self.conn_id
and self_fs == other_fs
_STORE_CACHE: dict[str, ObjectStore] = {}
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index 7c97c7b2b9..af6044c150 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -279,7 +279,7 @@ class TestFs:
assert s["protocol"] == "file"
assert s["conn_id"] == "mock"
- assert s["filesystem"] is None
+ assert s["filesystem"] == qualname(LocalFileSystem)
assert store == d
store = attach("localfs", fs=LocalFileSystem())