This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ab87cd02e4 Pass conn ID to ObjectStoragePath via URI (#35913)
ab87cd02e4 is described below
commit ab87cd02e4080df987a123d56cd9fa8c393a980d
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Fri Dec 1 21:03:10 2023 +0800
Pass conn ID to ObjectStoragePath via URI (#35913)
This enables an alternative ObjectStoragePath init syntax, using the
auth section in the URI to supply conn ID instead of a separate keyword
argument. The explicit keyword argument is honored if supplied.
---
airflow/example_dags/tutorial_objectstorage.py | 2 +-
airflow/io/path.py | 10 +++++++-
airflow/io/store/__init__.py | 2 +-
.../apache-airflow/core-concepts/objectstorage.rst | 16 +++++++------
docs/apache-airflow/tutorial/objectstorage.rst | 18 ++++++++++----
tests/io/test_path.py | 28 +++++++++++++++++-----
6 files changed, 56 insertions(+), 20 deletions(-)
diff --git a/airflow/example_dags/tutorial_objectstorage.py
b/airflow/example_dags/tutorial_objectstorage.py
index 11d817400d..4660aa3c8e 100644
--- a/airflow/example_dags/tutorial_objectstorage.py
+++ b/airflow/example_dags/tutorial_objectstorage.py
@@ -43,7 +43,7 @@ aq_fields = {
}
# [START create_object_storage_path]
-base = ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
+base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
# [END create_object_storage_path]
diff --git a/airflow/io/path.py b/airflow/io/path.py
index f5eeb14eff..0e6f80254b 100644
--- a/airflow/io/path.py
+++ b/airflow/io/path.py
@@ -92,6 +92,7 @@ class ObjectStoragePath(CloudPath):
cls: type[PT],
*args: str | os.PathLike,
scheme: str | None = None,
+ conn_id: str | None = None,
**kwargs: typing.Any,
) -> PT:
args_list = list(args)
@@ -137,7 +138,14 @@ class ObjectStoragePath(CloudPath):
else:
args_list.insert(0, parsed_url.path)
- return cls._from_parts(args_list, url=parsed_url, **kwargs) # type:
ignore
+ # This matches the parsing logic in urllib.parse; see:
+ #
https://github.com/python/cpython/blob/46adf6b701c440e047abf925df9a75a/Lib/urllib/parse.py#L194-L203
+ userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
+ if have_info:
+ conn_id = conn_id or userinfo or None
+ parsed_url = parsed_url._replace(netloc=hostinfo)
+
+ return cls._from_parts(args_list, url=parsed_url, conn_id=conn_id,
**kwargs) # type: ignore
@functools.lru_cache
def __hash__(self) -> int:
diff --git a/airflow/io/store/__init__.py b/airflow/io/store/__init__.py
index 6bf40c939f..a5a4bd12dd 100644
--- a/airflow/io/store/__init__.py
+++ b/airflow/io/store/__init__.py
@@ -131,7 +131,7 @@ def attach(
if not alias:
alias = f"{protocol}-{conn_id}" if conn_id else protocol
- if store := _STORE_CACHE.get(alias, None):
+ if store := _STORE_CACHE.get(alias):
return store
_STORE_CACHE[alias] = store = ObjectStore(protocol=protocol,
conn_id=conn_id, fs=fs)
diff --git a/docs/apache-airflow/core-concepts/objectstorage.rst
b/docs/apache-airflow/core-concepts/objectstorage.rst
index 046cb48522..d72a734293 100644
--- a/docs/apache-airflow/core-concepts/objectstorage.rst
+++ b/docs/apache-airflow/core-concepts/objectstorage.rst
@@ -74,20 +74,22 @@ object you want to interact with. For example, to point to
a bucket in s3, you w
from airflow.io.path import ObjectStoragePath
- base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default") #
conn_id is optional
+ base = ObjectStoragePath("s3://aws_default@my-bucket/")
+The username part of the URI is optional. It can alternatively be passed in as
a separate keyword argument:
+
+.. code-block:: python
+
+ # Equivalent to the previous example.
+ base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")
Listing file-objects:
.. code-block:: python
@task
- def list_files() -> list(ObjectStoragePath):
- files = []
- for f in base.iterdir():
- if f.is_file():
- files.append(f)
-
+ def list_files() -> list[ObjectStoragePath]:
+ files = [f for f in base.iterdir() if f.is_file()]
return files
diff --git a/docs/apache-airflow/tutorial/objectstorage.rst
b/docs/apache-airflow/tutorial/objectstorage.rst
index 89ffe0e8f9..610450b931 100644
--- a/docs/apache-airflow/tutorial/objectstorage.rst
+++ b/docs/apache-airflow/tutorial/objectstorage.rst
@@ -32,7 +32,7 @@ analytical database. You can do this by running ``pip install
duckdb``. The tuto
makes use of S3 Object Storage. This requires that the amazon provider is
installed
including ``s3fs`` by running ``pip install
apache-airflow-providers-amazon[s3fs]``.
If you would like to use a different storage provider, you can do so by
changing the
-url in the ``create_object_storage_path`` function to the appropriate url for
your
+URL in the ``create_object_storage_path`` function to the appropriate URL for
your
provider, for example by replacing ``s3://`` with ``gs://`` for Google Cloud
Storage.
You will also need the right provider to be installed then. Finally, you will
need
``pandas``, which can be installed by running ``pip install pandas``.
@@ -49,9 +49,19 @@ It is the fundamental building block of the Object Storage
API.
:start-after: [START create_object_storage_path]
:end-before: [END create_object_storage_path]
-The ObjectStoragePath constructor can take an optional connection id. If
supplied
-it will use the connection to obtain the right credentials to access the
backend.
-Otherwise it will revert to the default for that backend.
+The username part of the URL given to ObjectStoragePath should be a connection
ID.
+The specified connection will be used to obtain the right credentials to access
+the backend. If it is omitted, the default connection for the backend will be
used.
+
+The connection ID can alternatively be passed in with a keyword argument:
+
+.. code-block:: python
+
+ ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
+
+This is useful when reusing a URL defined for another purpose (e.g. Dataset),
+which generally does not contain a username part. The explicit keyword argument
+takes precedence over the URL's username value if both are specified.
It is safe to instantiate an ObjectStoragePath at the root of your DAG.
Connections
will not be created until the path is used. This means that you can create the
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index 1ac263c59f..54a675360f 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -52,6 +52,13 @@ class FakeRemoteFileSystem(LocalFileSystem):
class TestFs:
+ def setup_class(self):
+ self._store_cache = _STORE_CACHE.copy()
+
+ def teardown(self):
+ _STORE_CACHE.clear()
+ _STORE_CACHE.update(self._store_cache)
+
def test_alias(self):
store = attach("file", alias="local")
assert isinstance(store.fs, LocalFileSystem)
@@ -100,6 +107,19 @@ class TestFs:
assert not o.exists()
+ @pytest.fixture()
+ def fake_fs(self):
+ fs = mock.Mock()
+ fs._strip_protocol.return_value = "/"
+ fs.conn_id = "fake"
+ return fs
+
+ def test_objectstoragepath_init_conn_id_in_uri(self, fake_fs):
+ fake_fs.stat.return_value = {"stat": "result"}
+ attach(protocol="fake", conn_id="fake", fs=fake_fs)
+ p = ObjectStoragePath("fake://fake@bucket/path")
+ assert p.stat() == {"stat": "result", "conn_id": "fake", "protocol":
"fake"}
+
@pytest.mark.parametrize(
"fn, args, fn2, path, expected_args, expected_kwargs",
[
@@ -124,12 +144,8 @@ class TestFs:
),
],
)
- def test_standard_extended_api(self, monkeypatch, fn, args, fn2, path,
expected_args, expected_kwargs):
- _fs = mock.Mock()
- _fs._strip_protocol.return_value = "/"
- _fs.conn_id = "fake"
-
- store = attach(protocol="file", conn_id="fake", fs=_fs)
+ def test_standard_extended_api(self, fake_fs, fn, args, fn2, path,
expected_args, expected_kwargs):
+ store = attach(protocol="file", conn_id="fake", fs=fake_fs)
o = ObjectStoragePath(path, conn_id="fake")
getattr(o, fn)(**args)