This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 82c244f9c7 Fix GCSToGCSOperator ignores replace parameter when there
is no wildcard (#23340)
82c244f9c7 is described below
commit 82c244f9c7f24735ee952951bcb5add45422d186
Author: GitStart-AirFlow <[email protected]>
AuthorDate: Sun May 8 20:46:55 2022 +0100
Fix GCSToGCSOperator ignores replace parameter when there is no wildcard
(#23340)
---
.../providers/google/cloud/transfers/gcs_to_gcs.py | 50 ++++++++++++++--------
.../google/cloud/transfers/test_gcs_to_gcs.py | 17 ++++++++
2 files changed, 48 insertions(+), 19 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index e18442e184..5a10aa7a32 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -256,6 +256,32 @@ class GCSToGCSOperator(BaseOperator):
else:
self._copy_source_without_wildcard(hook=hook, prefix=prefix)
+ def _ignore_existing_files(self, hook, prefix, **kwargs):
+ # list all files in the Destination GCS bucket
+ # and only keep those files which are present in
+ # Source GCS bucket and not in Destination GCS bucket
+ delimiter = kwargs.get('delimiter')
+ objects = kwargs.get('objects')
+ if self.destination_object is None:
+ existing_objects = hook.list(self.destination_bucket,
prefix=prefix, delimiter=delimiter)
+ else:
+ self.log.info("Replaced destination_object with source_object
prefix.")
+ destination_objects = hook.list(
+ self.destination_bucket,
+ prefix=self.destination_object,
+ delimiter=delimiter,
+ )
+ existing_objects = [
+ dest_object.replace(self.destination_object, prefix, 1) for
dest_object in destination_objects
+ ]
+
+ objects = set(objects) - set(existing_objects)
+ if len(objects) > 0:
+ self.log.info('%s files are going to be synced: %s.',
len(objects), objects)
+ else:
+ self.log.info('There are no new files to sync. Have a nice day!')
+ return objects
+
def _copy_source_without_wildcard(self, hook, prefix):
"""
For source_objects with no wildcard, this operator would first list
@@ -298,6 +324,10 @@ class GCSToGCSOperator(BaseOperator):
"""
objects = hook.list(self.source_bucket, prefix=prefix,
delimiter=self.delimiter)
+ if not self.replace:
+ # If we are not replacing, ignore files already existing in source
buckets
+ objects = self._ignore_existing_files(hook, prefix,
objects=objects, delimiter=self.delimiter)
+
# If objects is empty and we have prefix, let's check if prefix is a
blob
# and copy directly
if len(objects) == 0 and prefix:
@@ -335,26 +365,8 @@ class GCSToGCSOperator(BaseOperator):
# If we are not replacing, list all files in the Destination GCS
bucket
# and only keep those files which are present in
# Source GCS bucket and not in Destination GCS bucket
+ objects = self._ignore_existing_files(hook, prefix_,
delimiter=delimiter, objects=objects)
- if self.destination_object is None:
- existing_objects = hook.list(self.destination_bucket,
prefix=prefix_, delimiter=delimiter)
- else:
- self.log.info("Replaced destination_object with source_object
prefix.")
- destination_objects = hook.list(
- self.destination_bucket,
- prefix=self.destination_object,
- delimiter=delimiter,
- )
- existing_objects = [
- dest_object.replace(self.destination_object, prefix_, 1)
- for dest_object in destination_objects
- ]
-
- objects = set(objects) - set(existing_objects)
- if len(objects) > 0:
- self.log.info('%s files are going to be synced: %s.',
len(objects), objects)
- else:
- self.log.info('There are no new files to sync. Have a nice
day!')
for source_object in objects:
if self.destination_object is None:
destination_object = source_object
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index dec0d37827..7d5af935ea 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -104,6 +104,23 @@ class
TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
]
mock_hook.return_value.list.assert_has_calls(mock_calls)
+ @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
+ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
+ operator = GCSToGCSOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=SOURCE_OBJECT_NO_WILDCARD,
+ destination_bucket=DESTINATION_BUCKET,
+ replace=False,
+ )
+
+ operator.execute(None)
+ mock_calls = [
+ mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
+ mock.call(DESTINATION_BUCKET, prefix="test_object.txt",
delimiter=None),
+ ]
+ mock_hook.return_value.list.assert_has_calls(mock_calls)
+
@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_prefix_and_suffix(self, mock_hook):
operator = GCSToGCSOperator(