rishi-kulkarni opened a new issue, #32732:
URL: https://github.com/apache/airflow/issues/32732

   ### Apache Airflow version
   
   2.6.3
   
   ### What happened
   
   Hi all, I'm having a bit of a problem with aiobotocore and the deferrable 
AWS Batch Operator. When deferrable is off, everything works fine, but for some 
very long running batch jobs I wanted to try out the async option. Example DAG:
   from airflow.decorators import dag
   from airflow.providers.amazon.aws.operators.batch import BatchOperator
   
   from datetime import datetime, timedelta
   
   default_args = {
       "owner": "rkulkarni",
       ...
   }
   
   ```
   @dag(
       default_args=default_args,
       catchup=False,
       schedule="0 1/8 * * *",
   )
   def batch_job_to_do():
       submit_batch_job = BatchOperator(
           task_id="submit_batch_job",
           job_name="job_name",
           job_queue="job_queue",
           job_definition="job_definition:1",
           overrides={},
           aws_conn_id="aws_prod_batch",
           region_name="us-east-1",
           awslogs_enabled=True,
           awslogs_fetch_interval=timedelta(seconds=30),
           deferrable=True
       )
   
       submit_batch_job  # type: ignore
   
   
   batch_job_to_do()
   ```
   And, for reference, this is running in an EC2 instance in one account that 
assumes a role in another account via STS to submit the job. Again, this all 
works fine when deferrable=False
   If deferrable=True, however, the DAG works properly until it wakes up the 
first time. 
   
   I've identified the cause of this error: 
https://github.com/apache/airflow/blob/15d42b4320d535cf54743929f134e36f59c615bb/airflow/providers/amazon/aws/hooks/base_aws.py#L211
 and a related error: 
https://github.com/apache/airflow/blob/15d42b4320d535cf54743929f134e36f59c615bb/airflow/providers/amazon/aws/hooks/base_aws.py#L204
   
   These should be creating `aiobotocore.credentials.AioRefreshableCredentials` 
and  `aiobotocore.credentials.AioDeferredRefreshableCredentials`, respectively. 
Annoyingly, these classes are built using a slightly different metadata format 
than the botocore versions (dict keys are snake case rather than Pascal case), 
but I can confirm that replacing `session._session._credentials` attribute with 
these fixes the above error. 
   
   I'm happy to submit a PR to resolve this. 
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Attempt to use STS-based authentication with a deferrable AWS operator (any 
operator) and it will produce the below error:
   
   
   ```
   Traceback (most recent call last):
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 537, in cleanup_finished_triggers
       result = details["task"].result()
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 615, in run_trigger
       async for event in trigger.run():
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/providers/amazon/aws/triggers/base.py",
 line 121, in run
       await async_wait(
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/providers/amazon/aws/utils/waiter_with_logging.py",
 line 122, in async_wait
       await waiter.wait(**args, WaiterConfig={"MaxAttempts": 1})
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py", 
line 49, in wait
       await AIOWaiter.wait(self, **kwargs)
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py", 
line 94, in wait
       response = await self._operation_method(**kwargs)
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py", 
line 77, in __call__
       return await self._client_method(**kwargs)
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/client.py", 
line 361, in _make_api_call
       http, parsed_response = await self._make_request(
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/client.py", 
line 386, in _make_request
       return await self._endpoint.make_request(
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py",
 line 96, in _send_request
       request = await self.create_request(request_dict, operation_model)
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py",
 line 84, in create_request
       await self._event_emitter.emit(
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/hooks.py", 
line 66, in _emit
       response = await resolve_awaitable(handler(**kwargs))
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/_helpers.py",
 line 15, in resolve_awaitable
       return await obj
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/signers.py", 
line 24, in handler
       return await self.sign(operation_name, request)
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/signers.py", 
line 73, in sign
       auth = await self.get_auth_instance(**kwargs)
     File 
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/signers.py", 
line 147, in get_auth_instance
       await self._credentials.get_frozen_credentials()
   TypeError: object ReadOnlyCredentials can't be used in 'await' expression
   ```
   
   ### Operating System
   
   AmazonLinux2
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to