This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e4757d6dfa Fix GCSToGCSOperator copy without wildcard and 
exact_match=True (#32376)
e4757d6dfa is described below

commit e4757d6dfa6e7385eb90c38c60ab8fefa24e7a0e
Author: max <[email protected]>
AuthorDate: Thu Jul 6 10:00:19 2023 +0200

    Fix GCSToGCSOperator copy without wildcard and exact_match=True (#32376)
---
 .../providers/google/cloud/transfers/gcs_to_gcs.py | 10 +++++-
 .../google/cloud/transfers/test_gcs_to_gcs.py      | 37 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py 
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 2b39df1c6a..222a1ecad7 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -370,6 +370,8 @@ class GCSToGCSOperator(BaseOperator):
             self.source_bucket, prefix=prefix, delimiter=self.delimiter, 
match_glob=self.match_glob
         )
 
+        objects = [obj for obj in objects if self._check_exact_match(obj, 
prefix)]
+
         if not self.replace:
             # If we are not replacing, ignore files already existing in source 
buckets
             objects = self._ignore_existing_files(
@@ -405,7 +407,7 @@ class GCSToGCSOperator(BaseOperator):
     def _copy_directory(self, hook, source_objects, prefix):
         _prefix = prefix.rstrip("/") + "/"
         for source_obj in source_objects:
-            if self.exact_match and (source_obj != prefix or not 
source_obj.endswith(prefix)):
+            if not self._check_exact_match(source_obj, prefix):
                 continue
             if self.destination_object is None:
                 destination_object = source_obj
@@ -417,6 +419,12 @@ class GCSToGCSOperator(BaseOperator):
                 hook=hook, source_object=source_obj, 
destination_object=destination_object
             )
 
+    def _check_exact_match(self, source_object: str, prefix: str) -> bool:
+        """Checks whether source_object's name matches the prefix according to 
the exact_match flag."""
+        if self.exact_match and (source_object != prefix or not 
source_object.endswith(prefix)):
+            return False
+        return True
+
     def _copy_source_with_wildcard(self, hook, prefix):
         total_wildcards = prefix.count(WILDCARD)
         if total_wildcards > 1:
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 1cf7be1166..3966be50cc 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -128,6 +128,7 @@ class TestGoogleCloudStorageToCloudStorageOperator:
     def test_copy_file_with_exact_match(self, mock_hook):
         SOURCE_FILES = [
             "test_object.txt",
+            "test_object.txt.abc",
             "test_object.txt.copy/",
             "test_object.txt.folder/",
         ]
@@ -145,6 +146,42 @@ class TestGoogleCloudStorageToCloudStorageOperator:
             mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None, 
match_glob=None),
         ]
         mock_hook.return_value.list.assert_has_calls(mock_calls)
+        mock_hook.return_value.rewrite.assert_has_calls(
+            [
+                mock.call(TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, 
"test_object.txt"),
+            ]
+        )
+
+    @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
+    def test_copy_file_with_exact_match_destination(self, mock_hook):
+        SOURCE_FILES = [
+            "test_object.txt",
+            "test_object.txt.abc",
+            "test_object.txt.copy/",
+            "test_object.txt.folder/",
+        ]
+        DESTINATION_OBJ = f"{DESTINATION_OBJECT_PREFIX}/test_object.txt"
+
+        mock_hook.return_value.list.return_value = SOURCE_FILES
+        operator = GCSToGCSOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_NO_WILDCARD,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object=DESTINATION_OBJ,
+            exact_match=True,
+        )
+
+        operator.execute(None)
+        mock_calls = [
+            mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None, 
match_glob=None),
+        ]
+        mock_hook.return_value.list.assert_has_calls(mock_calls)
+
+        mock_calls_rewrite = [
+            mock.call(TEST_BUCKET, "test_object.txt", DESTINATION_BUCKET, 
DESTINATION_OBJ),
+        ]
+        mock_hook.return_value.rewrite.assert_has_calls(mock_calls_rewrite)
 
     @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
     def test_execute_prefix_and_suffix(self, mock_hook):

Reply via email to