Repository: incubator-airflow
Updated Branches:
  refs/heads/master e95a1251b -> 17d3d1d9d


[AIRFLOW-2330] Do not append destination prefix if not given

Closes #3233 from berislavlopac/AIRFLOW-2330


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/17d3d1d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/17d3d1d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/17d3d1d9

Branch: refs/heads/master
Commit: 17d3d1d9dc87c0bbb03de049607c2ad76a4fd747
Parents: e95a125
Author: Berislav Lopac <[email protected]>
Authored: Thu Apr 19 10:26:23 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Thu Apr 19 10:26:23 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/gcs_to_gcs.py         | 41 ++++++++------
 .../operators/test_gcs_to_gcs_operator.py       | 58 +++++++++++++++++++-
 2 files changed, 81 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/17d3d1d9/airflow/contrib/operators/gcs_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_gcs.py 
b/airflow/contrib/operators/gcs_to_gcs.py
index dc67ddc..6acc517 100644
--- a/airflow/contrib/operators/gcs_to_gcs.py
+++ b/airflow/contrib/operators/gcs_to_gcs.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,7 +24,7 @@ from airflow.utils.decorators import apply_defaults
 
 class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
     """
-    Copies an object from a bucket to another, with renaming if requested.
+    Copies objects from a bucket to another, with renaming if requested.
 
     :param source_bucket: The source Google cloud storage bucket where the 
object is.
     :type source_bucket: string
@@ -43,8 +43,7 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
     destination Google cloud
         storage bucket.
         If a wildcard is supplied in the source_object argument, this is the
-        folder that the files will be
-        copied to in the destination bucket.
+        prefix that will be prepended to the final destination objects' paths.
     :type destination_object: string
     :param move_object: When move object is True, the object is moved instead
     of copied to the new location.
@@ -96,24 +95,34 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
             objects = hook.list(self.source_bucket,
                                 prefix=self.source_object[:wildcard_position],
                                 delimiter=self.source_object[wildcard_position 
+ 1:])
+
             for source_object in objects:
+                if self.destination_object:
+                    destination_object = 
"{}/{}".format(self.destination_object,
+                                                        source_object)
+                else:
+                    destination_object = source_object
                 self.log.info('Executing copy of gs://{0}/{1} to '
-                              'gs://{2}/{3}/{1}'.format(self.source_bucket,
-                                                        source_object,
-                                                        
self.destination_bucket,
-                                                        
self.destination_object,
-                                                        source_object))
+                              'gs://{2}/{3}'.format(self.source_bucket,
+                                                    source_object,
+                                                    self.destination_bucket,
+                                                    destination_object))
+
                 hook.copy(self.source_bucket, source_object,
-                          self.destination_bucket, 
"{}/{}".format(self.destination_object,
-                                                                  
source_object))
+                          self.destination_bucket, destination_object)
                 if self.move_object:
                     hook.delete(self.source_bucket, source_object)
 
         else:
-            self.log.info('Executing copy: %s, %s, %s, %s', self.source_bucket,
-                          self.source_object,
-                          self.destination_bucket or self.source_bucket,
-                          self.destination_object or self.source_object)
+            self.log.info(
+                'Executing copy of gs://{0}/{1} to '
+                'gs://{2}/{3}'.format(
+                    self.source_bucket,
+                    self.source_object,
+                    self.destination_bucket or self.source_bucket,
+                    self.destination_object or self.source_object
+                )
+            )
             hook.copy(self.source_bucket, self.source_object,
                       self.destination_bucket, self.destination_object)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/17d3d1d9/tests/contrib/operators/test_gcs_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py 
b/tests/contrib/operators/test_gcs_to_gcs_operator.py
index ac9faf3..3c46c49 100644
--- a/tests/contrib/operators/test_gcs_to_gcs_operator.py
+++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,23 @@ SOURCE_OBJECT_1 = '*test_object'
 SOURCE_OBJECT_2 = 'test_object*'
 SOURCE_OBJECT_3 = 'test*object'
 DESTINATION_BUCKET = 'archive'
+DESTINATION_OBJECT_PREFIX = 'foo/bar'
+SOURCE_FILES_LIST = [
+    'test_object/file1.txt',
+    'test_object/file2.txt',
+    'some_other/file.txt'
+]
+MOCK_CALLS = [
+    mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET,
+              DESTINATION_OBJECT_PREFIX + '/' + file_path)
+    for file_path in SOURCE_FILES_LIST
+    if file_path.startswith(SOURCE_OBJECT_1)
+]
+MOCK_CALLS_EMPTY = [
+    mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET, file_path)
+    for file_path in SOURCE_FILES_LIST
+    if file_path.startswith(SOURCE_OBJECT_1)
+]
 
 
 class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase):
@@ -46,6 +63,8 @@ class 
GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase):
     no_prefix: *test_object
     no_suffix: test_object*
     prefix_and_suffix: test*object
+
+    Also tests the destionation_object as prefix when the wildcard is used.
     """
 
     @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
@@ -83,3 +102,38 @@ class 
GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase):
         mock_hook.return_value.list.assert_called_once_with(
             TEST_BUCKET, prefix="test", delimiter="object"
         )
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_no_suffix_with_destination_object(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_2,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object=DESTINATION_OBJECT_PREFIX)
+
+        operator.execute(None)
+        mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_no_suffix_without_destination_object(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_2,
+            destination_bucket=DESTINATION_BUCKET)
+
+        operator.execute(None)
+        mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_no_suffix_empty_destination_object(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_2,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object='')
+
+        operator.execute(None)
+        mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY)

Reply via email to