This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c1bba9906c Fix assignment of template field in `__init__` in 
`AwsToAwsBaseOperator` (#36604)
c1bba9906c is described below

commit c1bba9906c4923bcb80364afc5450eaaba4c0a21
Author: rom sharon <[email protected]>
AuthorDate: Fri Jan 5 15:31:35 2024 +0200

    Fix assignment of template field in `__init__` in `AwsToAwsBaseOperator` 
(#36604)
    
    * add assignment of templated field
    
    * fix static checks
    
    * fix static checks
---
 airflow/providers/amazon/aws/hooks/s3.py                 | 4 +++-
 airflow/providers/amazon/aws/transfers/base.py           | 9 ++++++---
 airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py | 3 ++-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/s3.py 
b/airflow/providers/amazon/aws/hooks/s3.py
index d75e3e337a..9d4ecaa38d 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -40,6 +40,8 @@ from urllib.parse import urlsplit
 from uuid import uuid4
 
 if TYPE_CHECKING:
+    from airflow.utils.types import ArgNotSet
+
     with suppress(ImportError):
         from aiobotocore.client import AioBaseClient
 
@@ -167,7 +169,7 @@ class S3Hook(AwsBaseHook):
 
     def __init__(
         self,
-        aws_conn_id: str | None = AwsBaseHook.default_conn_name,
+        aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name,
         transfer_config_args: dict | None = None,
         extra_args: dict | None = None,
         *args,
diff --git a/airflow/providers/amazon/aws/transfers/base.py 
b/airflow/providers/amazon/aws/transfers/base.py
index 2a58c48449..71433e5e3e 100644
--- a/airflow/providers/amazon/aws/transfers/base.py
+++ b/airflow/providers/amazon/aws/transfers/base.py
@@ -60,11 +60,14 @@ class AwsToAwsBaseOperator(BaseOperator):
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+        self.source_aws_conn_id = source_aws_conn_id
+        self.dest_aws_conn_id = dest_aws_conn_id
         if not isinstance(aws_conn_id, ArgNotSet):
             warnings.warn(_DEPRECATION_MSG, AirflowProviderDeprecationWarning, 
stacklevel=3)
             self.source_aws_conn_id = aws_conn_id
         else:
             self.source_aws_conn_id = source_aws_conn_id
-        self.dest_aws_conn_id = (
-            self.source_aws_conn_id if isinstance(dest_aws_conn_id, ArgNotSet) 
else dest_aws_conn_id
-        )
+        if isinstance(dest_aws_conn_id, ArgNotSet):
+            self.dest_aws_conn_id = self.source_aws_conn_id
+        else:
+            self.dest_aws_conn_id = dest_aws_conn_id
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py 
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 5351f3ff7c..646d3a96f8 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -35,6 +35,7 @@ from airflow.providers.amazon.aws.transfers.base import 
AwsToAwsBaseOperator
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
+    from airflow.utils.types import ArgNotSet
 
 
 class JSONEncoder(json.JSONEncoder):
@@ -55,7 +56,7 @@ def _upload_file_to_s3(
     file_obj: IO,
     bucket_name: str,
     s3_key_prefix: str,
-    aws_conn_id: str | None = AwsBaseHook.default_conn_name,
+    aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name,
 ) -> None:
     s3_client = S3Hook(aws_conn_id=aws_conn_id).get_conn()
     file_obj.seek(0)

Reply via email to