Taragolis commented on code in PR #28850:
URL: https://github.com/apache/airflow/pull/28850#discussion_r1067135598


##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: 
str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):

Review Comment:
   Is it necessarily base this class on AwsBaseHook? Seems like it do not 
actually use any methods and properties of it.



##########
airflow/providers/amazon/aws/hooks/redshift_cluster.py:
##########
@@ -183,3 +187,86 @@ def get_cluster_snapshot_status(self, snapshot_identifier: 
str, cluster_identifi
             return snapshot_status
         except self.get_conn().exceptions.ClusterSnapshotNotFoundFault:
             return None
+
+
+class RedshiftHookAsync(AwsBaseHookAsync):
+    """Interact with AWS Redshift using aiobotocore library"""
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        kwargs["client_type"] = "redshift"
+        kwargs["resource_type"] = "redshift"

Review Comment:
   This is redundant because `resource_type` it only could be created by 
`boto3`.
   And also `redshift` is not listed as a valid resource in `boto3`
   
   ```python
   Python 3.9.10 (main, Feb 25 2022, 16:54:01) 
   Type 'copyright', 'credits' or 'license' for more information
   IPython 8.7.0 -- An enhanced Interactive Python. Type '?' for help.
   PyDev console: using IPython 8.7.0
   Python 3.9.10 (main, Feb 25 2022, 16:54:01) 
   [Clang 13.0.0 (clang-1300.0.29.30)] on darwin
   
   import boto3
   boto3.__version__
   Out[3]: '1.26.43'
   
   session = boto3.session(region_name="us-east-1")  # All features become 
first in N.Virginia
   session.get_available_resources()
   
   Out[6]: 
   ['cloudformation',
    'cloudwatch',
    'dynamodb',
    'ec2',
    'glacier',
    'iam',
    'opsworks',
    's3',
    'sns',
    'sqs']
   ```



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: 
str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. 
Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. 
Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow 
connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await 
sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()
+        session_token = conn_config.aws_session_token
+        aws_secret = conn_config.aws_secret_access_key
+        aws_access = conn_config.aws_access_key_id
+        if conn_config.role_arn:
+            credentials = await self.get_role_credentials(
+                async_session=async_connection, conn_config=conn_config
+            )
+            if credentials:
+                session_token = credentials["SessionToken"]
+                aws_access = credentials["AccessKeyId"]
+                aws_secret = credentials["SecretAccessKey"]
+        return async_connection.create_client(
+            service_name=self.client_type,
+            region_name=conn_config.region_name,
+            aws_secret_access_key=aws_secret,
+            aws_access_key_id=aws_access,
+            aws_session_token=session_token,
+            verify=self.verify,
+            config=self.config,
+            endpoint_url=conn_config.endpoint_url,
+        )
+
+    @staticmethod
+    async def get_role_credentials(
+        async_session: AioSession, conn_config: AwsConnectionWrapper
+    ) -> Optional[Dict[str, str]]:
+        """Get the role_arn, method credentials from connection details and 
get the role credentials detail"""
+        async with async_session.create_client(
+            "sts",
+            aws_access_key_id=conn_config.aws_access_key_id,
+            aws_secret_access_key=conn_config.aws_secret_access_key,
+        ) as client:

Review Comment:
   What if user provide `profile_name` or `aws_session_token`?



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: 
str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. 
Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. 
Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow 
connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await 
sync_to_async(self.get_connection)(self.aws_conn_id)
+

Review Comment:
   This is inconsistent with `AwsBaseHook`. 
   1. If user provide None, then default `boto3` (to be honest `botocore`) 
credentials strategy - (env vars, EC2/ECS Metadata and etc.)
   2. If user provide invalid `aws_conn_id` then it fallback to the default 
`boto3` credentials strategy
   
   
https://github.com/apache/airflow/blob/352d492c66e69e816fb1547e46fc1e3b7ba32066/airflow/providers/amazon/aws/hooks/base_aws.py#L507-L526



##########
airflow/providers/amazon/aws/hooks/base_aws.py:
##########
@@ -843,3 +848,82 @@ def _parse_s3_config(config_file_name: str, config_format: 
str | None = "boto",
         config_format=config_format,
         profile=profile,
     )
+
+
+class AwsBaseHookAsync(AwsBaseHook):
+    """
+    Interacts with AWS using aiobotocore asynchronously.
+
+    .. note::
+        AwsBaseHookAsync uses aiobotocore to create asynchronous S3 hooks. 
Hence, AwsBaseHookAsync
+        only supports the authentication mechanism that aiobotocore supports. 
Currently, AwsBaseHookAsync supports
+        only AWS STS client method ``assume_role`` provided in the Airflow 
connection extra args via aiobotocore.
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :param verify: Whether or not to verify SSL certificates.
+    :param region_name: AWS region_name. If not specified then the default 
boto3 behaviour is used.
+    :param client_type: boto3.client client_type. Eg 's3', 'emr' etc
+    :param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
+    :param config: Configuration for botocore client.
+    .. seealso::
+        `AWS API 
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html>`_
+    """
+
+    async def get_client_async(self) -> AioBaseClient:
+        """Create an Async Client object to communicate with AWS services."""
+
+        connection_object = await 
sync_to_async(self.get_connection)(self.aws_conn_id)
+
+        conn_config = AwsConnectionWrapper(
+            conn=connection_object,
+            region_name=self.region_name,
+            botocore_config=self.config,
+            verify=self.verify,
+        )
+
+        async_connection = get_session()

Review Comment:
   I think we could use `AioSession()` instead and even provide some useful 
stuff here.
   Or create `AsyncBaseSessionFactory` class which care about everything like 
`BaseSessionFactory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to