This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c1bba9906c Fix assignment of template field in `__init__` in
`AwsToAwsBaseOperator` (#36604)
c1bba9906c is described below
commit c1bba9906c4923bcb80364afc5450eaaba4c0a21
Author: rom sharon <[email protected]>
AuthorDate: Fri Jan 5 15:31:35 2024 +0200
Fix assignment of template field in `__init__` in `AwsToAwsBaseOperator`
(#36604)
* add assignment of templated field
* fix static checks
* fix static checks
---
airflow/providers/amazon/aws/hooks/s3.py | 4 +++-
airflow/providers/amazon/aws/transfers/base.py | 9 ++++++---
airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py | 3 ++-
3 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/s3.py
b/airflow/providers/amazon/aws/hooks/s3.py
index d75e3e337a..9d4ecaa38d 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -40,6 +40,8 @@ from urllib.parse import urlsplit
from uuid import uuid4
if TYPE_CHECKING:
+ from airflow.utils.types import ArgNotSet
+
with suppress(ImportError):
from aiobotocore.client import AioBaseClient
@@ -167,7 +169,7 @@ class S3Hook(AwsBaseHook):
def __init__(
self,
- aws_conn_id: str | None = AwsBaseHook.default_conn_name,
+ aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name,
transfer_config_args: dict | None = None,
extra_args: dict | None = None,
*args,
diff --git a/airflow/providers/amazon/aws/transfers/base.py
b/airflow/providers/amazon/aws/transfers/base.py
index 2a58c48449..71433e5e3e 100644
--- a/airflow/providers/amazon/aws/transfers/base.py
+++ b/airflow/providers/amazon/aws/transfers/base.py
@@ -60,11 +60,14 @@ class AwsToAwsBaseOperator(BaseOperator):
**kwargs,
) -> None:
super().__init__(**kwargs)
+ self.source_aws_conn_id = source_aws_conn_id
+ self.dest_aws_conn_id = dest_aws_conn_id
if not isinstance(aws_conn_id, ArgNotSet):
warnings.warn(_DEPRECATION_MSG, AirflowProviderDeprecationWarning,
stacklevel=3)
self.source_aws_conn_id = aws_conn_id
else:
self.source_aws_conn_id = source_aws_conn_id
- self.dest_aws_conn_id = (
- self.source_aws_conn_id if isinstance(dest_aws_conn_id, ArgNotSet)
else dest_aws_conn_id
- )
+ if isinstance(dest_aws_conn_id, ArgNotSet):
+ self.dest_aws_conn_id = self.source_aws_conn_id
+ else:
+ self.dest_aws_conn_id = dest_aws_conn_id
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 5351f3ff7c..646d3a96f8 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -35,6 +35,7 @@ from airflow.providers.amazon.aws.transfers.base import
AwsToAwsBaseOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
+ from airflow.utils.types import ArgNotSet
class JSONEncoder(json.JSONEncoder):
@@ -55,7 +56,7 @@ def _upload_file_to_s3(
file_obj: IO,
bucket_name: str,
s3_key_prefix: str,
- aws_conn_id: str | None = AwsBaseHook.default_conn_name,
+ aws_conn_id: str | None | ArgNotSet = AwsBaseHook.default_conn_name,
) -> None:
s3_client = S3Hook(aws_conn_id=aws_conn_id).get_conn()
file_obj.seek(0)