This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e4757d6dfa Fix GCSToGCSOperator copy without wildcard and
exact_match=True (#32376)
e4757d6dfa is described below
commit e4757d6dfa6e7385eb90c38c60ab8fefa24e7a0e
Author: max <[email protected]>
AuthorDate: Thu Jul 6 10:00:19 2023 +0200
Fix GCSToGCSOperator copy without wildcard and exact_match=True (#32376)
---
.../providers/google/cloud/transfers/gcs_to_gcs.py | 10 +++++-
.../google/cloud/transfers/test_gcs_to_gcs.py | 37 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 2b39df1c6a..222a1ecad7 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -370,6 +370,8 @@ class GCSToGCSOperator(BaseOperator):
self.source_bucket, prefix=prefix, delimiter=self.delimiter,
match_glob=self.match_glob
)
+ objects = [obj for obj in objects if self._check_exact_match(obj,
prefix)]
+
if not self.replace:
# If we are not replacing, ignore files already existing in source
buckets
objects = self._ignore_existing_files(
@@ -405,7 +407,7 @@ class GCSToGCSOperator(BaseOperator):
def _copy_directory(self, hook, source_objects, prefix):
_prefix = prefix.rstrip("/") + "/"
for source_obj in source_objects:
- if self.exact_match and (source_obj != prefix or not
source_obj.endswith(prefix)):
+ if not self._check_exact_match(source_obj, prefix):
continue
if self.destination_object is None:
destination_object = source_obj
@@ -417,6 +419,12 @@ class GCSToGCSOperator(BaseOperator):
hook=hook, source_object=source_obj,
destination_object=destination_object
)
+ def _check_exact_match(self, source_object: str, prefix: str) -> bool:
+ """Checks whether source_object's name matches the prefix according to
the exact_match flag."""
+ if self.exact_match and (source_object != prefix or not
source_object.endswith(prefix)):
+ return False
+ return True
+
def _copy_source_with_wildcard(self, hook, prefix):
total_wildcards = prefix.count(WILDCARD)
if total_wildcards > 1:
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 1cf7be1166..3966be50cc 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -128,6 +128,7 @@ class TestGoogleCloudStorageToCloudStorageOperator:
def test_copy_file_with_exact_match(self, mock_hook):
SOURCE_FILES = [
"test_object.txt",
+ "test_object.txt.abc",
"test_object.txt.copy/",
"test_object.txt.folder/",
]
@@ -145,6 +146,42 @@ class TestGoogleCloudStorageToCloudStorageOperator:
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None,
match_glob=None),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)
+ mock_hook.return_value.rewrite.assert_has_calls(
+ [
+ mock.call(TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET,
"test_object.txt"),
+ ]
+ )
+
+ @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
+ def test_copy_file_with_exact_match_destination(self, mock_hook):
+ SOURCE_FILES = [
+ "test_object.txt",
+ "test_object.txt.abc",
+ "test_object.txt.copy/",
+ "test_object.txt.folder/",
+ ]
+ DESTINATION_OBJ = f"{DESTINATION_OBJECT_PREFIX}/test_object.txt"
+
+ mock_hook.return_value.list.return_value = SOURCE_FILES
+ operator = GCSToGCSOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=SOURCE_OBJECT_NO_WILDCARD,
+ destination_bucket=DESTINATION_BUCKET,
+ destination_object=DESTINATION_OBJ,
+ exact_match=True,
+ )
+
+ operator.execute(None)
+ mock_calls = [
+ mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None,
match_glob=None),
+ ]
+ mock_hook.return_value.list.assert_has_calls(mock_calls)
+
+ mock_calls_rewrite = [
+ mock.call(TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET,
DESTINATION_OBJ),
+ ]
+ mock_hook.return_value.rewrite.assert_has_calls(mock_calls_rewrite)
@mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
def test_execute_prefix_and_suffix(self, mock_hook):