This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 575bf2f040 Bug fix GCSToS3Operator: avoid `ValueError` when
`replace=False` with files already in S3 (#32322)
575bf2f040 is described below
commit 575bf2f04089b7c99a8ee30637f1d88492ef4742
Author: Akash Sharma <[email protected]>
AuthorDate: Tue Jul 4 20:54:11 2023 +0530
Bug fix GCSToS3Operator: avoid `ValueError` when `replace=False` with files
already in S3 (#32322)
---
.../providers/amazon/aws/transfers/gcs_to_s3.py | 5 ++
.../amazon/aws/transfers/test_gcs_to_s3.py | 69 ++++++++++++++++------
2 files changed, 57 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index 2213de2b60..d57de7e11e 100644
--- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -161,6 +161,11 @@ class GCSToS3Operator(BaseOperator):
# and only keep those files which are present in
# Google Cloud Storage and not in S3
bucket_name, prefix = S3Hook.parse_s3_url(self.dest_s3_key)
+ # if prefix is empty, do not add "/" at end since it would
+ # filter all the objects (return empty list) instead of empty
+ # prefix returning all the objects
+ if prefix:
+ prefix = prefix if prefix.endswith("/") else f"{prefix}/"
# look for the bucket and the prefix to avoid look into
# parent directories/keys
existing_files = s3_hook.list_keys(bucket_name, prefix=prefix)
diff --git a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
index a7a0b2e430..5e64f167ba 100644
--- a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
@@ -33,6 +33,7 @@ PREFIX = "TEST"
S3_BUCKET = "s3://bucket/"
MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
S3_ACL_POLICY = "private-read"
+deprecated_call_match = "Usage of 'delimiter' is deprecated, please use
'match_glob' instead"
def _create_test_bucket():
@@ -47,8 +48,6 @@ def _create_test_bucket():
@mock_s3
class TestGCSToS3Operator:
-
- # Test0: match_glob
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute__match_glob(self, mock_hook):
mock_hook.return_value.list.return_value = MOCK_FILES
@@ -73,7 +72,6 @@ class TestGCSToS3Operator:
bucket_name=GCS_BUCKET, delimiter=None,
match_glob=f"**/*{DELIMITER}", prefix=PREFIX
)
- # Test1: incremental behaviour (just some files missing)
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute_incremental(self, mock_hook):
mock_hook.return_value.list.return_value = MOCK_FILES
@@ -81,7 +79,7 @@ class TestGCSToS3Operator:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -100,15 +98,17 @@ class TestGCSToS3Operator:
assert sorted(MOCK_FILES[1:]) == sorted(uploaded_files)
assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket",
delimiter="/"))
- # Test2: All the files are already in origin and destination without
replace
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute_without_replace(self, mock_hook):
+ """
+ Tests scenario where all the files are already in origin and
destination without replace
+ """
mock_hook.return_value.list.return_value = MOCK_FILES
with NamedTemporaryFile() as f:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -128,15 +128,53 @@ class TestGCSToS3Operator:
assert [] == uploaded_files
assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket",
delimiter="/"))
- # Test3: There are no files in destination bucket
+ @pytest.mark.parametrize(
+ argnames="dest_s3_url",
+ argvalues=[f"{S3_BUCKET}/test/", f"{S3_BUCKET}/test"],
+ )
+ @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+ def test_execute_without_replace_with_folder_structure(self, mock_hook,
dest_s3_url):
+ mock_files_gcs = [f"test{idx}/{mock_file}" for idx, mock_file in
enumerate(MOCK_FILES)]
+ mock_files_s3 = [f"test/test{idx}/{mock_file}" for idx, mock_file in
enumerate(MOCK_FILES)]
+ mock_hook.return_value.list.return_value = mock_files_gcs
+
+ hook, bucket = _create_test_bucket()
+ for mock_file_s3 in mock_files_s3:
+ bucket.put_object(Key=mock_file_s3, Body=b"testing")
+
+ with NamedTemporaryFile() as f:
+ gcs_provide_file = mock_hook.return_value.provide_file
+ gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+ with pytest.deprecated_call(match=deprecated_call_match):
+ operator = GCSToS3Operator(
+ task_id=TASK_ID,
+ bucket=GCS_BUCKET,
+ prefix=PREFIX,
+ delimiter=DELIMITER,
+ dest_aws_conn_id="aws_default",
+ dest_s3_key=dest_s3_url,
+ replace=False,
+ )
+
+ # we expect nothing to be uploaded
+ # and all the MOCK_FILES to be present at the S3 bucket
+ uploaded_files = operator.execute(None)
+
+ assert [] == uploaded_files
+ assert sorted(mock_files_s3) == sorted(hook.list_keys("bucket",
prefix="test/"))
+
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute(self, mock_hook):
+ """
+ Tests the scenario where there are no files in destination bucket
+ """
mock_hook.return_value.list.return_value = MOCK_FILES
with NamedTemporaryFile() as f:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -154,7 +192,6 @@ class TestGCSToS3Operator:
assert sorted(MOCK_FILES) == sorted(uploaded_files)
assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket",
delimiter="/"))
- # Test4: Destination and Origin are in sync but replace all files in
destination
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute_with_replace(self, mock_hook):
mock_hook.return_value.list.return_value = MOCK_FILES
@@ -162,7 +199,7 @@ class TestGCSToS3Operator:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -182,7 +219,6 @@ class TestGCSToS3Operator:
assert sorted(MOCK_FILES) == sorted(uploaded_files)
assert sorted(MOCK_FILES) == sorted(hook.list_keys("bucket",
delimiter="/"))
- # Test5: Incremental sync with replace
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
def test_execute_incremental_with_replace(self, mock_hook):
mock_hook.return_value.list.return_value = MOCK_FILES
@@ -190,7 +226,7 @@ class TestGCSToS3Operator:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -218,7 +254,7 @@ class TestGCSToS3Operator:
s3_mock_hook.return_value = mock.Mock()
s3_mock_hook.parse_s3_url.return_value = mock.Mock()
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -241,7 +277,7 @@ class TestGCSToS3Operator:
s3_mock_hook.return_value = mock.Mock()
s3_mock_hook.parse_s3_url.return_value = mock.Mock()
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -259,7 +295,6 @@ class TestGCSToS3Operator:
aws_conn_id="aws_default", extra_args={"ContentLanguage":
"value"}, verify=None
)
- # Test6: s3_acl_policy parameter is set
@mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.load_file")
def test_execute_with_s3_acl_policy(self, mock_load_file, mock_gcs_hook):
@@ -268,7 +303,7 @@ class TestGCSToS3Operator:
gcs_provide_file = mock_gcs_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,
@@ -293,7 +328,7 @@ class TestGCSToS3Operator:
gcs_provide_file = mock_hook.return_value.provide_file
gcs_provide_file.return_value.__enter__.return_value.name = f.name
- with pytest.deprecated_call():
+ with pytest.deprecated_call(match=deprecated_call_match):
operator = GCSToS3Operator(
task_id=TASK_ID,
bucket=GCS_BUCKET,