This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b6499ace63 Rename `bucket` to `gcs_bucket` in `GCSToS3Operator`
(#33031)
b6499ace63 is described below
commit b6499ace635ea36bb690a9a1373e8dddec6151e2
Author: Hank Ehly <[email protected]>
AuthorDate: Tue Sep 26 21:58:22 2023 +0900
Rename `bucket` to `gcs_bucket` in `GCSToS3Operator` (#33031)
---------
Co-authored-by: Vincent <[email protected]>
---
.../providers/amazon/aws/transfers/gcs_to_s3.py | 27 ++++++++---
.../amazon/aws/transfers/test_gcs_to_s3.py | 56 +++++++++++++++++-----
.../providers/amazon/aws/example_gcs_to_s3.py | 2 +-
3 files changed, 66 insertions(+), 19 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index b21768b4ab..cdf0dcdc44 100644
--- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -41,7 +41,8 @@ class GCSToS3Operator(BaseOperator):
For more information on how to use this operator, take a look at the
guide:
:ref:`howto/operator:GCSToS3Operator`
- :param bucket: The Google Cloud Storage bucket to find the objects.
(templated)
+ :param gcs_bucket: The Google Cloud Storage bucket to find the objects.
(templated)
+ :param bucket: (Deprecated) Use ``gcs_bucket`` instead.
:param prefix: Prefix string which filters objects whose name begin with
this prefix. (templated)
:param delimiter: (Deprecated) The delimiter by which you want to filter
the objects. (templated)
@@ -87,7 +88,7 @@ class GCSToS3Operator(BaseOperator):
"""
template_fields: Sequence[str] = (
- "bucket",
+ "gcs_bucket",
"prefix",
"delimiter",
"dest_s3_key",
@@ -99,7 +100,8 @@ class GCSToS3Operator(BaseOperator):
def __init__(
self,
*,
- bucket: str,
+ gcs_bucket: str | None = None,
+ bucket: str | None = None,
prefix: str | None = None,
delimiter: str | None = None,
gcp_conn_id: str = "google_cloud_default",
@@ -117,7 +119,18 @@ class GCSToS3Operator(BaseOperator):
) -> None:
super().__init__(**kwargs)
- self.bucket = bucket
+ if bucket:
+ warnings.warn(
+ "The ``bucket`` parameter is deprecated and will be removed in
a future version. "
+ "Please use ``gcs_bucket`` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ self.gcs_bucket = bucket
+ if gcs_bucket:
+ self.gcs_bucket = gcs_bucket
+ if not (bucket or gcs_bucket):
+ raise ValueError("You must pass either ``bucket`` or
``gcs_bucket``.")
self.prefix = prefix
self.gcp_conn_id = gcp_conn_id
self.dest_aws_conn_id = dest_aws_conn_id
@@ -161,13 +174,13 @@ class GCSToS3Operator(BaseOperator):
self.log.info(
"Getting list of the files. Bucket: %s; Delimiter: %s; Prefix: %s",
- self.bucket,
+ self.gcs_bucket,
self.delimiter,
self.prefix,
)
list_kwargs = {
- "bucket_name": self.bucket,
+ "bucket_name": self.gcs_bucket,
"prefix": self.prefix,
"delimiter": self.delimiter,
"user_project": self.gcp_user_project,
@@ -206,7 +219,7 @@ class GCSToS3Operator(BaseOperator):
if gcs_files:
for file in gcs_files:
with gcs_hook.provide_file(
- object_name=file, bucket_name=self.bucket,
user_project=self.gcp_user_project
+ object_name=file, bucket_name=self.gcs_bucket,
user_project=self.gcp_user_project
) as local_tmp_file:
dest_key = os.path.join(self.dest_s3_key, file)
self.log.info("Saving file to %s", dest_key)
diff --git a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
index 9d5c497de1..520b521a21 100644
--- a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
@@ -57,7 +57,7 @@ class TestGCSToS3Operator:
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
dest_aws_conn_id="aws_default",
dest_s3_key=S3_BUCKET,
@@ -86,7 +86,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -115,7 +115,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -153,7 +153,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -181,7 +181,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -196,6 +196,40 @@ class TestGCSToS3Operator:
assert sorted(MOCK_FILES) == sorted(uploaded_files)
assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket",
delimiter="/"))
+ @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+ def test_execute_gcs_bucket_rename_compatibility(self, mock_hook):
+ """
+ Tests the same conditions as `test_execute` using the deprecated
`bucket` parameter instead of
+ `gcs_bucket`. This test can be removed when the `bucket` parameter is
removed.
+ """
+ mock_hook.return_value.list.return_value = MOCK_FILES
+ with NamedTemporaryFile() as f:
+ gcs_provide_file = mock_hook.return_value.provide_file
+ gcs_provide_file.return_value.__enter__.return_value.name = f.name
+ bucket_param_deprecated_message = (
+ "The ``bucket`` parameter is deprecated and will be removed in
a future version. "
+ "Please use ``gcs_bucket`` instead."
+ )
+ with pytest.deprecated_call(match=bucket_param_deprecated_message):
+ operator = GCSToS3Operator(
+ task_id=TASK_ID,
+ bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ match_glob=DELIMITER,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=S3_BUCKET,
+ replace=False,
+ )
+ hook, _ = _create_test_bucket()
+ # we expect all MOCK_FILES to be uploaded
+ # and all MOCK_FILES to be present at the S3 bucket
+ uploaded_files = operator.execute(None)
+ assert sorted(MOCK_FILES) == sorted(uploaded_files)
+ assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket",
delimiter="/"))
+ with pytest.raises(ValueError) as excinfo:
+ GCSToS3Operator(task_id=TASK_ID, dest_s3_key=S3_BUCKET)
+ assert str(excinfo.value) == "You must pass either ``bucket`` or
``gcs_bucket``."
+
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute_with_replace(self, mock_hook):
mock_hook.return_value.list.return_value = MOCK_FILES
@@ -206,7 +240,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -233,7 +267,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -261,7 +295,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -284,7 +318,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -310,7 +344,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
@@ -335,7 +369,7 @@ class TestGCSToS3Operator:
with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
- bucket=GCS_BUCKET,
+ gcs_bucket=GCS_BUCKET,
prefix=PREFIX,
delimiter=DELIMITER,
dest_aws_conn_id="aws_default",
diff --git a/tests/system/providers/amazon/aws/example_gcs_to_s3.py
b/tests/system/providers/amazon/aws/example_gcs_to_s3.py
index 68db86f82a..e039a98a51 100644
--- a/tests/system/providers/amazon/aws/example_gcs_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_gcs_to_s3.py
@@ -80,7 +80,7 @@ with DAG(
# [START howto_transfer_gcs_to_s3]
gcs_to_s3 = GCSToS3Operator(
task_id="gcs_to_s3",
- bucket=gcs_bucket,
+ gcs_bucket=gcs_bucket,
dest_s3_key=f"s3://{s3_bucket}/{s3_key}",
replace=True,
gcp_user_project=gcp_user_project,