rishi-kulkarni opened a new issue, #32732:
URL: https://github.com/apache/airflow/issues/32732
### Apache Airflow version
2.6.3
### What happened
Hi all, I'm having a bit of a problem with aiobotocore and the deferrable
AWS Batch Operator. When deferrable is off, everything works fine, but for some
very long running batch jobs I wanted to try out the async option. Example DAG:
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.batch import BatchOperator
from datetime import datetime, timedelta
default_args = {
"owner": "rkulkarni",
...
}
```
@dag(
default_args=default_args,
catchup=False,
schedule="0 1/8 * * *",
)
def batch_job_to_do():
submit_batch_job = BatchOperator(
task_id="submit_batch_job",
job_name="job_name",
job_queue="job_queue",
job_definition="job_definition:1",
overrides={},
aws_conn_id="aws_prod_batch",
region_name="us-east-1",
awslogs_enabled=True,
awslogs_fetch_interval=timedelta(seconds=30),
deferrable=True
)
submit_batch_job # type: ignore
batch_job_to_do()
```
And, for reference, this is running in an EC2 instance in one account that
assumes a role in another account via STS to submit the job. Again, this all
works fine when deferrable=False
If deferrable=True, however, the DAG works properly until it wakes up the
first time.
I've identified the cause of this error:
https://github.com/apache/airflow/blob/15d42b4320d535cf54743929f134e36f59c615bb/airflow/providers/amazon/aws/hooks/base_aws.py#L211
and a related error:
https://github.com/apache/airflow/blob/15d42b4320d535cf54743929f134e36f59c615bb/airflow/providers/amazon/aws/hooks/base_aws.py#L204
These should be creating `aiobotocore.credentials.AioRefreshableCredentials`
and `aiobotocore.credentials.AioDeferredRefreshableCredentials`, respectively.
Annoyingly, these classes are built using a slightly different metadata format
than the botocore versions (dict keys are snake case rather than Pascal case),
but I can confirm that replacing `session._session._credentials` attribute with
these fixes the above error.
I'm happy to submit a PR to resolve this.
### What you think should happen instead
_No response_
### How to reproduce
Attempt to use STS-based authentication with a deferrable AWS operator (any
operator) and it will produce the below error:
```
Traceback (most recent call last):
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 537, in cleanup_finished_triggers
result = details["task"].result()
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 615, in run_trigger
async for event in trigger.run():
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/providers/amazon/aws/triggers/base.py",
line 121, in run
await async_wait(
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/airflow/providers/amazon/aws/utils/waiter_with_logging.py",
line 122, in async_wait
await waiter.wait(**args, WaiterConfig={"MaxAttempts": 1})
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py",
line 49, in wait
await AIOWaiter.wait(self, **kwargs)
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py",
line 94, in wait
response = await self._operation_method(**kwargs)
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/waiter.py",
line 77, in __call__
return await self._client_method(**kwargs)
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/client.py",
line 361, in _make_api_call
http, parsed_response = await self._make_request(
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/client.py",
line 386, in _make_request
return await self._endpoint.make_request(
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py",
line 96, in _send_request
request = await self.create_request(request_dict, operation_model)
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py",
line 84, in create_request
await self._event_emitter.emit(
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/hooks.py",
line 66, in _emit
response = await resolve_awaitable(handler(**kwargs))
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/_helpers.py",
line 15, in resolve_awaitable
return await obj
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/signers.py",
line 24, in handler
return await self.sign(operation_name, request)
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/signers.py",
line 73, in sign
auth = await self.get_auth_instance(**kwargs)
File
"/home/airflow/dagger/venv/lib/python3.9/site-packages/aiobotocore/signers.py",
line 147, in get_auth_instance
await self._credentials.get_frozen_credentials()
TypeError: object ReadOnlyCredentials can't be used in 'await' expression
```
### Operating System
AmazonLinux2
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]