arollet-decathlon opened a new issue, #39970:
URL: https://github.com/apache/airflow/issues/39970

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   8.16.0
   
   ### Apache Airflow version
   
   2.8.1
   
   ### Operating System
   
   Amazon Linux 2023.4.20240513
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   Also tested on Standalone Instance which leads
   
   ### What happened
   
   When trying to read a file with Object Storage with some code looking like 
this:
   
   ```python
   base = ObjectStoragePath("s3://bucket/", conn_id="aws_test")
   path = base / "path/to/key"
   with path.open() as f:
       print(f.read())
   ```
   I have a strange traceback error when the 'aws_test' connection with a 
role_name.
   
   ```
   Traceback (most recent call last):
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 433, in _execute_task
       result = execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py",
 line 241, in execute
       return_value = super().execute(context)
                      ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
 line 199, in execute
       return_value = self.execute_callable()
                      ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py",
 line 216, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/airflow/dags/dps/example_dags/s3_object_storage.py", line 
39, in read_file
       with path.open() as f:
            ^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line 
324, in open
       return self._accessor.open(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line 
63, in open
       return self._fs.open(self._format_path(path), mode, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 
1283, in open
       self.open(
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 
1295, in open
       f = self._open(
           ^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 
671, in _open
       return S3File(
              ^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 
2110, in __init__
       super().__init__(
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 
1651, in __init__
       self.size = self.details["size"]
                   ^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 
1664, in details
       self._details = self.fs.info(self.path)
                       ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 
118, in wrapper
       return sync(self.loop, func, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 
103, in sync
       raise return_result
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 
56, in _runner
       result[0] = await coro
                   ^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 
1302, in _info
       out = await self._call_s3(
             ^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 
341, in _call_s3
       await self.set_session()
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 
524, in set_session
       s3creator = self.session.create_client(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
   AttributeError: 'Session' object has no attribute 'create_client'
   ```
   
   If the 'aws_test' connection is set with an Access Key ID and Secret Access 
Key I do not have the error.
   
   ### What you think should happen instead
   
   ObjectStoragePath should support AWS connections set with role names, 
especially since this is a more robust way than credentials from a security 
point of view.
   
   ### How to reproduce
   
   from datetime import datetime
   import os
   from urllib.parse import quote_plus
   ```python
   import json
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.io.path import ObjectStoragePath
   
   from airflow.providers.amazon.aws.hooks.sts import StsHook
   
   DAG_ID = "minimal_objectstorage_s3"
   
   # Setting aws connection with role
   AWS_ROLE = "ROLE-ARN-XXXXXXXXXXXXXXXXX"
   EXTRA = {"role_arn": AWS_ROLE}
   os.environ['AIRFLOW_CONN_AWS_DEFAULT2'] = 
f"aws://?__extra__={quote_plus(json.dumps(EXTRA))}"
   
   # Setting aws connection with Access Keys
   # ACCESS_KEY = "AAAAAAAAA"
   # SECRET_KEY = "BBBBBBBBBB"
   # os.environ['AIRFLOW_CONN_AWS_CONN'] = f"aws://{ACCESS_KEY}:{SECRET_KEY}@"
   
   
   base = ObjectStoragePath("s3://bucket/", conn_id="aws_default")
   
   # dag definition:
   with DAG(
       dag_id=DAG_ID,
       start_date=datetime(2021, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
   
       @task
       def read_file(path: ObjectStoragePath) -> str:
   
           with path.open() as f:
               return f.read()
   
       read_file(base / "path/to/key")
   
       @task
       def aws_identity():
           hook = StsHook(
               aws_conn_id='aws_default'
           )
   
           print(hook.conn.get_caller_identity())
   
       aws_identity()
   
   ```
   
   ### Anything else
   
   Happens every time.
   The error seems to lie somewhere between Airflow and Upath or s3fs packages 
but I'm not able to understand their Python code.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to