Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test deecbb9bf -> b0a09abed


[AIRFLOW-2451] Remove extra slash ('/') char when using wildcard in gcs_to_gcs 
operator

Closes #3355 from berislavlopac/AIRFLOW-2451

(cherry picked from commit 06e1806366cdb10af55527dbf567f7e0d5ac6d65)
Signed-off-by: Kaxil Naik <kaxiln...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0a09abe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0a09abe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0a09abe

Branch: refs/heads/v1-10-test
Commit: b0a09abed82fcd984756717de59fd6e9faa2d46e
Parents: deecbb9
Author: Unknown <beris...@lopac.net>
Authored: Mon May 14 19:41:14 2018 +0100
Committer: Kaxil Naik <kaxiln...@gmail.com>
Committed: Mon May 14 19:43:07 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/gcs_to_gcs.py         | 85 ++++++++++++++------
 .../operators/test_gcs_to_gcs_operator.py       | 72 ++++++++++++-----
 2 files changed, 113 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0a09abe/airflow/contrib/operators/gcs_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_gcs.py 
b/airflow/contrib/operators/gcs_to_gcs.py
index 9bcf9d4..466e631 100644
--- a/airflow/contrib/operators/gcs_to_gcs.py
+++ b/airflow/contrib/operators/gcs_to_gcs.py
@@ -44,6 +44,12 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
         storage bucket.
         If a wildcard is supplied in the source_object argument, this is the
         prefix that will be prepended to the final destination objects' paths.
+        Note that the source path's part before the wildcard will be removed;
+        if it needs to be retained it should be appended to destination_object.
+        For example, with prefix ``foo/*`` and destination_object `'blah/``, 
the
+        file ``foo/baz`` will be copied to ``blah/baz``; to retain the prefix 
write
+        the destination_object as e.g. ``blah/foo``, in which case the copied 
file
+        will be named ``blah/foo/baz``.
     :type destination_object: string
     :param move_object: When move object is True, the object is moved instead
     of copied to the new location.
@@ -57,6 +63,44 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
         For this to work, the service account making the request must have
         domain-wide delegation enabled.
     :type delegate_to: string
+
+    **Examples**:
+        The following Operator would copy a single file named
+        ``sales/sales-2017/january.avro`` in the ``data`` bucket to the file 
named
+        ``copied_sales/2017/january-backup.avro` in the ``data_backup`` bucket 
::
+            copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator(
+                task_id='copy_single_file',
+                source_bucket='data',
+                source_object='sales/sales-2017/january.avro',
+                destination_bucket='data_backup',
+                destination_object='copied_sales/2017/january-backup.avro',
+                google_cloud_storage_conn_id=google_cloud_conn_id
+            )
+
+        The following Operator would copy all the Avro files from 
``sales/sales-2017``
+        folder (i.e. with names starting with that prefix) in ``data`` bucket 
to the
+        ``copied_sales/2017`` folder in the ``data_backup`` bucket. ::
+            copy_files = GoogleCloudStorageToGoogleCloudStorageOperator(
+                task_id='copy_files',
+                source_bucket='data',
+                source_object='sales/sales-2017/*.avro',
+                destination_bucket='data_backup',
+                destination_object='copied_sales/2017/',
+                google_cloud_storage_conn_id=google_cloud_conn_id
+            )
+
+        The following Operator would move all the Avro files from 
``sales/sales-2017``
+        folder (i.e. with names starting with that prefix) in ``data`` bucket 
to the
+        same folder in the ``data_backup`` bucket, deleting the original files 
in the
+        process. ::
+            move_files = GoogleCloudStorageToGoogleCloudStorageOperator(
+                task_id='move_files',
+                source_bucket='data',
+                source_object='sales/sales-2017/*.avro',
+                destination_bucket='data_backup',
+                move_object=True,
+                google_cloud_storage_conn_id=google_cloud_conn_id
+            )
     """
     template_fields = ('source_bucket', 'source_object', 'destination_bucket',
                        'destination_object',)
@@ -73,8 +117,8 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
                  delegate_to=None,
                  *args,
                  **kwargs):
-        super(GoogleCloudStorageToGoogleCloudStorageOperator, self).__init__(
-            *args, **kwargs)
+        super(GoogleCloudStorageToGoogleCloudStorageOperator,
+              self).__init__(*args, **kwargs)
         self.source_bucket = source_bucket
         self.source_object = source_object
         self.destination_bucket = destination_bucket
@@ -82,6 +126,7 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
         self.move_object = move_object
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.delegate_to = delegate_to
+        self.wildcard = '*'
 
     def execute(self, context):
 
@@ -89,24 +134,22 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
             google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
             delegate_to=self.delegate_to
         )
+        log_message = 'Executing copy of gs://{0}/{1} to gs://{2}/{3}'
 
-        if '*' in self.source_object:
-            wildcard_position = self.source_object.index('*')
-            objects = hook.list(self.source_bucket,
-                                prefix=self.source_object[:wildcard_position],
-                                delimiter=self.source_object[wildcard_position 
+ 1:])
+        if self.wildcard in self.source_object:
+            prefix, delimiter = self.source_object.split(self.wildcard, 1)
+            objects = hook.list(self.source_bucket, prefix=prefix, 
delimiter=delimiter)
 
             for source_object in objects:
-                if self.destination_object:
-                    destination_object = 
"{}/{}".format(self.destination_object,
-                                                        
source_object[wildcard_position:])
-                else:
+                if self.destination_object is None:
                     destination_object = source_object
-                self.log.info('Executing copy of gs://{0}/{1} to '
-                              'gs://{2}/{3}'.format(self.source_bucket,
-                                                    source_object,
-                                                    self.destination_bucket,
-                                                    destination_object))
+                else:
+                    destination_object = source_object.replace(prefix,
+                                                               
self.destination_object, 1)
+                self.log.info(
+                    log_message.format(self.source_bucket, source_object,
+                                       self.destination_bucket, 
destination_object)
+                )
 
                 hook.copy(self.source_bucket, source_object,
                           self.destination_bucket, destination_object)
@@ -115,13 +158,9 @@ class 
GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator):
 
         else:
             self.log.info(
-                'Executing copy of gs://{0}/{1} to '
-                'gs://{2}/{3}'.format(
-                    self.source_bucket,
-                    self.source_object,
-                    self.destination_bucket or self.source_bucket,
-                    self.destination_object or self.source_object
-                )
+                log_message.format(self.source_bucket, self.source_object,
+                                   self.destination_bucket or 
self.source_bucket,
+                                   self.destination_object or 
self.source_object)
             )
             hook.copy(self.source_bucket, self.source_object,
                       self.destination_bucket, self.destination_object)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0a09abe/tests/contrib/operators/test_gcs_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py 
b/tests/contrib/operators/test_gcs_to_gcs_operator.py
index 3c46c49..6093d0f 100644
--- a/tests/contrib/operators/test_gcs_to_gcs_operator.py
+++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py
@@ -37,23 +37,13 @@ PREFIX = 'TEST'
 SOURCE_OBJECT_1 = '*test_object'
 SOURCE_OBJECT_2 = 'test_object*'
 SOURCE_OBJECT_3 = 'test*object'
+SOURCE_OBJECT_4 = 'test_object*.txt'
 DESTINATION_BUCKET = 'archive'
 DESTINATION_OBJECT_PREFIX = 'foo/bar'
 SOURCE_FILES_LIST = [
     'test_object/file1.txt',
     'test_object/file2.txt',
-    'some_other/file.txt'
-]
-MOCK_CALLS = [
-    mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET,
-              DESTINATION_OBJECT_PREFIX + '/' + file_path)
-    for file_path in SOURCE_FILES_LIST
-    if file_path.startswith(SOURCE_OBJECT_1)
-]
-MOCK_CALLS_EMPTY = [
-    mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET, file_path)
-    for file_path in SOURCE_FILES_LIST
-    if file_path.startswith(SOURCE_OBJECT_1)
+    'test_object/file3.json',
 ]
 
 
@@ -103,37 +93,77 @@ class 
GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase):
             TEST_BUCKET, prefix="test", delimiter="object"
         )
 
+    # copy with wildcard
+
     @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
-    def test_execute_no_suffix_with_destination_object(self, mock_hook):
+    def test_execute_wildcard_with_destination_object(self, mock_hook):
         mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
         operator = GoogleCloudStorageToGoogleCloudStorageOperator(
             task_id=TASK_ID, source_bucket=TEST_BUCKET,
-            source_object=SOURCE_OBJECT_2,
+            source_object=SOURCE_OBJECT_4,
             destination_bucket=DESTINATION_BUCKET,
             destination_object=DESTINATION_OBJECT_PREFIX)
 
         operator.execute(None)
-        mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS)
+        mock_calls = [
+            mock.call(TEST_BUCKET, 'test_object/file1.txt',
+                      DESTINATION_BUCKET, 'foo/bar/file1.txt'),
+            mock.call(TEST_BUCKET, 'test_object/file2.txt',
+                      DESTINATION_BUCKET, 'foo/bar/file2.txt'),
+        ]
+        mock_hook.return_value.copy.assert_has_calls(mock_calls)
 
     @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
-    def test_execute_no_suffix_without_destination_object(self, mock_hook):
+    def test_execute_wildcard_with_destination_object_retained_prefix(self, 
mock_hook):
         mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
         operator = GoogleCloudStorageToGoogleCloudStorageOperator(
             task_id=TASK_ID, source_bucket=TEST_BUCKET,
-            source_object=SOURCE_OBJECT_2,
+            source_object=SOURCE_OBJECT_4,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object='{}/{}'.format(DESTINATION_OBJECT_PREFIX,
+                                              SOURCE_OBJECT_2[:-1])
+        )
+
+        operator.execute(None)
+        mock_calls_retained = [
+            mock.call(TEST_BUCKET, 'test_object/file1.txt',
+                      DESTINATION_BUCKET, 'foo/bar/test_object/file1.txt'),
+            mock.call(TEST_BUCKET, 'test_object/file2.txt',
+                      DESTINATION_BUCKET, 'foo/bar/test_object/file2.txt'),
+        ]
+        mock_hook.return_value.copy.assert_has_calls(mock_calls_retained)
+
+    @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
+    def test_execute_wildcard_without_destination_object(self, mock_hook):
+        mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
+        operator = GoogleCloudStorageToGoogleCloudStorageOperator(
+            task_id=TASK_ID, source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_4,
             destination_bucket=DESTINATION_BUCKET)
 
         operator.execute(None)
-        mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY)
+        mock_calls_none = [
+            mock.call(TEST_BUCKET, 'test_object/file1.txt',
+                      DESTINATION_BUCKET, 'test_object/file1.txt'),
+            mock.call(TEST_BUCKET, 'test_object/file2.txt',
+                      DESTINATION_BUCKET, 'test_object/file2.txt'),
+        ]
+        mock_hook.return_value.copy.assert_has_calls(mock_calls_none)
 
     @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook')
-    def test_execute_no_suffix_empty_destination_object(self, mock_hook):
+    def test_execute_wildcard_empty_destination_object(self, mock_hook):
         mock_hook.return_value.list.return_value = SOURCE_FILES_LIST
         operator = GoogleCloudStorageToGoogleCloudStorageOperator(
             task_id=TASK_ID, source_bucket=TEST_BUCKET,
-            source_object=SOURCE_OBJECT_2,
+            source_object=SOURCE_OBJECT_4,
             destination_bucket=DESTINATION_BUCKET,
             destination_object='')
 
         operator.execute(None)
-        mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY)
+        mock_calls_empty = [
+            mock.call(TEST_BUCKET, 'test_object/file1.txt',
+                      DESTINATION_BUCKET, '/file1.txt'),
+            mock.call(TEST_BUCKET, 'test_object/file2.txt',
+                      DESTINATION_BUCKET, '/file2.txt'),
+        ]
+        mock_hook.return_value.copy.assert_has_calls(mock_calls_empty)

Reply via email to