arollet-decathlon opened a new issue, #39970:
URL: https://github.com/apache/airflow/issues/39970
### Apache Airflow Provider(s)
amazon
### Versions of Apache Airflow Providers
8.16.0
### Apache Airflow version
2.8.1
### Operating System
Amazon Linux 2023.4.20240513
### Deployment
Amazon (AWS) MWAA
### Deployment details
Also tested on Standalone Instance which leads
### What happened
When trying to read a file with Object Storage with some code looking like
this:
```python
base = ObjectStoragePath("s3://bucket/", conn_id="aws_test")
path = base / "path/to/key"
with path.open() as f:
print(f.read())
```
I have a strange traceback error when the 'aws_test' connection with a
role_name.
```
Traceback (most recent call last):
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py",
line 241, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
line 199, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
line 216, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/dags/dps/example_dags/s3_object_storage.py", line
39, in read_file
with path.open() as f:
^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line
324, in open
return self._accessor.open(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line
63, in open
return self._fs.open(self._format_path(path), mode, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line
1283, in open
self.open(
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line
1295, in open
f = self._open(
^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line
671, in _open
return S3File(
^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line
2110, in __init__
super().__init__(
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line
1651, in __init__
self.size = self.details["size"]
^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line
1664, in details
self._details = self.fs.info(self.path)
^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line
118, in wrapper
return sync(self.loop, func, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line
103, in sync
raise return_result
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line
56, in _runner
result[0] = await coro
^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line
1302, in _info
out = await self._call_s3(
^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line
341, in _call_s3
await self.set_session()
File
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line
524, in set_session
s3creator = self.session.create_client(
^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Session' object has no attribute 'create_client'
```
If the 'aws_test' connection is set with an Access Key ID and Secret Access
Key I do not have the error.
### What you think should happen instead
ObjectStoragePath should support AWS connections set with role names,
especially since this is a more robust way than credentials from a security
point of view.
### How to reproduce
from datetime import datetime
import os
from urllib.parse import quote_plus
```python
import json
from airflow import DAG
from airflow.decorators import task
from airflow.io.path import ObjectStoragePath
from airflow.providers.amazon.aws.hooks.sts import StsHook
DAG_ID = "minimal_objectstorage_s3"
# Setting aws connection with role
AWS_ROLE = "ROLE-ARN-XXXXXXXXXXXXXXXXX"
EXTRA = {"role_arn": AWS_ROLE}
os.environ['AIRFLOW_CONN_AWS_DEFAULT2'] =
f"aws://?__extra__={quote_plus(json.dumps(EXTRA))}"
# Setting aws connection with Access Keys
# ACCESS_KEY = "AAAAAAAAA"
# SECRET_KEY = "BBBBBBBBBB"
# os.environ['AIRFLOW_CONN_AWS_CONN'] = f"aws://{ACCESS_KEY}:{SECRET_KEY}@"
base = ObjectStoragePath("s3://bucket/", conn_id="aws_default")
# dag definition:
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
) as dag:
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
read_file(base / "path/to/key")
@task
def aws_identity():
hook = StsHook(
aws_conn_id='aws_default'
)
print(hook.conn.get_caller_identity())
aws_identity()
```
### Anything else
Happens every time.
The error seems to lie somewhere between Airflow and Upath or s3fs packages
but I'm not able to understand their Python code.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]