Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1067135598
##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format:
str | None = "boto",
config_format=config_format,
profile=profile,
)
+
+
+class AwsBaseHookAsync(AwsBaseHook):
Review Comment:
Is it necessarily base this class on AwsBaseHook? Seems like it do not
actually use any methods and properties of it.
##########
airflow/providers/amazon/aws/hooks/redshift_cluster.py:
##########
@@ -183,3 +187,86 @@ def get_cluster_snapshot_status(self, snapshot_identifier:
str, cluster_identifi
return snapshot_status
except self.get_conn().exceptions.ClusterSnapshotNotFoundFault:
return None
+
+
+class RedshiftHookAsync(AwsBaseHookAsync):
+ """Interact with AWS Redshift using aiobotocore library"""
+
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
+ kwargs["client_type"] = "redshift"
+ kwargs["resource_type"] = "redshift"
Review Comment:
This is redundant because `resource_type` it only could be created by
`boto3`.
And also `redshift` is not listed as a valid resource in `boto3`
```python
Python 3.9.10 (main, Feb 25 2022, 16:54:01)
Type 'copyright', 'credits' or 'license' for more information
IPython 8.7.0 -- An enhanced Interactive Python. Type '?' for help.
PyDev console: using IPython 8.7.0
Python 3.9.10 (main, Feb 25 2022, 16:54:01)
[Clang 13.0.0 (clang-1300.0.29.30)] on darwin
import boto3
boto3.__version__
Out[3]: '1.26.43'
session = boto3.session(region_name="us-east-1") # All features become
first in N.Virginia
session.get_available_resources()
Out[6]:
['cloudformation',
'cloudwatch',
'dynamodb',
'ec2',
'glacier',
'iam',
'opsworks',
's3',
'sns',
'sqs']
```
##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format:
str | None = "boto",
config_format=config_format,
profile=profile,
)
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+ """
+ Interacts with AWS using aiobotocore asynchronously.
+
+ .. note::
+ AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks.
Hence, AwsBaseHookAsync
+ only supports the authentication mechanism that aiobotocore supports.
Currently, AwsBaseHookAsync supports
+ only AWS STS client method ``assume_role`` provided in the Airflow
connection extra args via aiobotocore.
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is None or empty then the default boto3 behaviour is used. If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param verify: Whether or not to verify SSL certificates.
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+ :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+ :param config: Configuration for botocore client.
+ .. seealso::
+ `AWS API
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+ """
+
+ async def get_client_async(self) -> AioBaseClient:
+ """Create an Async Client object to communicate with AWS services."""
+
+ connection_object = await
sync_to_async(self.get_connection)(self.aws_conn_id)
+
+ conn_config = AwsConnectionWrapper(
+ conn=connection_object,
+ region_name=self.region_name,
+ botocore_config=self.config,
+ verify=self.verify,
+ )
+
+ async_connection = get_session()
+ session_token = conn_config.aws_session_token
+ aws_secret = conn_config.aws_secret_access_key
+ aws_access = conn_config.aws_access_key_id
+ if conn_config.role_arn:
+ credentials = await self.get_role_credentials(
+ async_session=async_connection, conn_config=conn_config
+ )
+ if credentials:
+ session_token = credentials["SessionToken"]
+ aws_access = credentials["AccessKeyId"]
+ aws_secret = credentials["SecretAccessKey"]
+ return async_connection.create_client(
+ service_name=self.client_type,
+ region_name=conn_config.region_name,
+ aws_secret_access_key=aws_secret,
+ aws_access_key_id=aws_access,
+ aws_session_token=session_token,
+ verify=self.verify,
+ config=self.config,
+ endpoint_url=conn_config.endpoint_url,
+ )
+
+ @staticmethod
+ async def get_role_credentials(
+ async_session: AioSession, conn_config: AwsConnectionWrapper
+ ) -> Optional[Dict[str, str]]:
+ """Get the role_arn, method credentials from connection details and
get the role credentials detail"""
+ async with async_session.create_client(
+ "sts",
+ aws_access_key_id=conn_config.aws_access_key_id,
+ aws_secret_access_key=conn_config.aws_secret_access_key,
+ ) as client:
Review Comment:
What if user provide `profile_name` or `aws_session_token`?
##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format:
str | None = "boto",
config_format=config_format,
profile=profile,
)
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+ """
+ Interacts with AWS using aiobotocore asynchronously.
+
+ .. note::
+ AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks.
Hence, AwsBaseHookAsync
+ only supports the authentication mechanism that aiobotocore supports.
Currently, AwsBaseHookAsync supports
+ only AWS STS client method ``assume_role`` provided in the Airflow
connection extra args via aiobotocore.
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is None or empty then the default boto3 behaviour is used. If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param verify: Whether or not to verify SSL certificates.
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+ :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+ :param config: Configuration for botocore client.
+ .. seealso::
+ `AWS API
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+ """
+
+ async def get_client_async(self) -> AioBaseClient:
+ """Create an Async Client object to communicate with AWS services."""
+
+ connection_object = await
sync_to_async(self.get_connection)(self.aws_conn_id)
+
Review Comment:
This is inconsistent with `AwsBaseHook`.
1. If user provide None, then default `boto3` (to be honest `botocore`)
credentials strategy - (env vars, EC2/ECS Metadata and etc.)
2. If user provide invalid `aws_conn_id` then it fallback to the default
`boto3` credentials strategy
https://github.com/apache/airflow/blob/352d492c66e69e816fb1547e46fc1e3b7ba32066/airflow/providers/amazon/aws/hooks/base_aws.py#L507-L526
##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format:
str | None = "boto",
config_format=config_format,
profile=profile,
)
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+ """
+ Interacts with AWS using aiobotocore asynchronously.
+
+ .. note::
+ AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks.
Hence, AwsBaseHookAsync
+ only supports the authentication mechanism that aiobotocore supports.
Currently, AwsBaseHookAsync supports
+ only AWS STS client method ``assume_role`` provided in the Airflow
connection extra args via aiobotocore.
+
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ If this is None or empty then the default boto3 behaviour is used. If
+ running Airflow in a distributed manner and aws_conn_id is None or
+ empty, then default boto3 configuration would be used (and must be
+ maintained on each worker node).
+ :param verify: Whether or not to verify SSL certificates.
+ :param region_name: AWS region_name. If not specified then the default
boto3 behaviour is used.
+ :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+ :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+ :param config: Configuration for botocore client.
+ .. seealso::
+ `AWS API
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+ """
+
+ async def get_client_async(self) -> AioBaseClient:
+ """Create an Async Client object to communicate with AWS services."""
+
+ connection_object = await
sync_to_async(self.get_connection)(self.aws_conn_id)
+
+ conn_config = AwsConnectionWrapper(
+ conn=connection_object,
+ region_name=self.region_name,
+ botocore_config=self.config,
+ verify=self.verify,
+ )
+
+ async_connection = get_session()
Review Comment:
I think we could use `AioSession()` instead and even provide some useful
stuff here.
Or create `AsyncBaseSessionFactory` class which care about everything like
`BaseSessionFactory`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]