This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 82c244f9c7 Fix GCSToGCSOperator ignores replace parameter when there 
is no wildcard (#23340)
82c244f9c7 is described below

commit 82c244f9c7f24735ee952951bcb5add45422d186
Author: GitStart-AirFlow <[email protected]>
AuthorDate: Sun May 8 20:46:55 2022 +0100

    Fix GCSToGCSOperator ignores replace parameter when there is no wildcard 
(#23340)
---
 .../providers/google/cloud/transfers/gcs_to_gcs.py | 50 ++++++++++++++--------
 .../google/cloud/transfers/test_gcs_to_gcs.py      | 17 ++++++++
 2 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py 
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index e18442e184..5a10aa7a32 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -256,6 +256,32 @@ class GCSToGCSOperator(BaseOperator):
             else:
                 self._copy_source_without_wildcard(hook=hook, prefix=prefix)
 
+    def _ignore_existing_files(self, hook, prefix, **kwargs):
+        # list all files in the Destination GCS bucket
+        # and only keep those files which are present in
+        # Source GCS bucket and not in Destination GCS bucket
+        delimiter = kwargs.get('delimiter')
+        objects = kwargs.get('objects')
+        if self.destination_object is None:
+            existing_objects = hook.list(self.destination_bucket, 
prefix=prefix, delimiter=delimiter)
+        else:
+            self.log.info("Replaced destination_object with source_object 
prefix.")
+            destination_objects = hook.list(
+                self.destination_bucket,
+                prefix=self.destination_object,
+                delimiter=delimiter,
+            )
+            existing_objects = [
+                dest_object.replace(self.destination_object, prefix, 1) for 
dest_object in destination_objects
+            ]
+
+        objects = set(objects) - set(existing_objects)
+        if len(objects) > 0:
+            self.log.info('%s files are going to be synced: %s.', 
len(objects), objects)
+        else:
+            self.log.info('There are no new files to sync. Have a nice day!')
+        return objects
+
     def _copy_source_without_wildcard(self, hook, prefix):
         """
         For source_objects with no wildcard, this operator would first list
@@ -298,6 +324,10 @@ class GCSToGCSOperator(BaseOperator):
         """
         objects = hook.list(self.source_bucket, prefix=prefix, 
delimiter=self.delimiter)
 
+        if not self.replace:
+            # If we are not replacing, ignore files already existing in source 
buckets
+            objects = self._ignore_existing_files(hook, prefix, 
objects=objects, delimiter=self.delimiter)
+
         # If objects is empty and we have prefix, let's check if prefix is a 
blob
         # and copy directly
         if len(objects) == 0 and prefix:
@@ -335,26 +365,8 @@ class GCSToGCSOperator(BaseOperator):
             # If we are not replacing, list all files in the Destination GCS 
bucket
             # and only keep those files which are present in
             # Source GCS bucket and not in Destination GCS bucket
+            objects = self._ignore_existing_files(hook, prefix_, 
delimiter=delimiter, objects=objects)
 
-            if self.destination_object is None:
-                existing_objects = hook.list(self.destination_bucket, 
prefix=prefix_, delimiter=delimiter)
-            else:
-                self.log.info("Replaced destination_object with source_object 
prefix.")
-                destination_objects = hook.list(
-                    self.destination_bucket,
-                    prefix=self.destination_object,
-                    delimiter=delimiter,
-                )
-                existing_objects = [
-                    dest_object.replace(self.destination_object, prefix_, 1)
-                    for dest_object in destination_objects
-                ]
-
-            objects = set(objects) - set(existing_objects)
-            if len(objects) > 0:
-                self.log.info('%s files are going to be synced: %s.', 
len(objects), objects)
-            else:
-                self.log.info('There are no new files to sync. Have a nice 
day!')
         for source_object in objects:
             if self.destination_object is None:
                 destination_object = source_object
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index dec0d37827..7d5af935ea 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -104,6 +104,23 @@ class 
TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
         ]
         mock_hook.return_value.list.assert_has_calls(mock_calls)
 
+    @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
+    def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
+        operator = GCSToGCSOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_NO_WILDCARD,
+            destination_bucket=DESTINATION_BUCKET,
+            replace=False,
+        )
+
+        operator.execute(None)
+        mock_calls = [
+            mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
+            mock.call(DESTINATION_BUCKET, prefix="test_object.txt", 
delimiter=None),
+        ]
+        mock_hook.return_value.list.assert_has_calls(mock_calls)
+
     @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
     def test_execute_prefix_and_suffix(self, mock_hook):
         operator = GCSToGCSOperator(

Reply via email to