Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test deecbb9bf -> b0a09abed
[AIRFLOW-2451] Remove extra slash ('/') char when using wildcard in gcs_to_gcs operator Closes #3355 from berislavlopac/AIRFLOW-2451 (cherry picked from commit 06e1806366cdb10af55527dbf567f7e0d5ac6d65) Signed-off-by: Kaxil Naik <kaxiln...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0a09abe Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0a09abe Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0a09abe Branch: refs/heads/v1-10-test Commit: b0a09abed82fcd984756717de59fd6e9faa2d46e Parents: deecbb9 Author: Unknown <beris...@lopac.net> Authored: Mon May 14 19:41:14 2018 +0100 Committer: Kaxil Naik <kaxiln...@gmail.com> Committed: Mon May 14 19:43:07 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/gcs_to_gcs.py | 85 ++++++++++++++------ .../operators/test_gcs_to_gcs_operator.py | 72 ++++++++++++----- 2 files changed, 113 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0a09abe/airflow/contrib/operators/gcs_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_gcs.py b/airflow/contrib/operators/gcs_to_gcs.py index 9bcf9d4..466e631 100644 --- a/airflow/contrib/operators/gcs_to_gcs.py +++ b/airflow/contrib/operators/gcs_to_gcs.py @@ -44,6 +44,12 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): storage bucket. If a wildcard is supplied in the source_object argument, this is the prefix that will be prepended to the final destination objects' paths. + Note that the source path's part before the wildcard will be removed; + if it needs to be retained it should be appended to destination_object. + For example, with prefix ``foo/*`` and destination_object `'blah/``, the + file ``foo/baz`` will be copied to ``blah/baz``; to retain the prefix write + the destination_object as e.g. ``blah/foo``, in which case the copied file + will be named ``blah/foo/baz``. :type destination_object: string :param move_object: When move object is True, the object is moved instead of copied to the new location. @@ -57,6 +63,44 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string + + **Examples**: + The following Operator would copy a single file named + ``sales/sales-2017/january.avro`` in the ``data`` bucket to the file named + ``copied_sales/2017/january-backup.avro` in the ``data_backup`` bucket :: + copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id='copy_single_file', + source_bucket='data', + source_object='sales/sales-2017/january.avro', + destination_bucket='data_backup', + destination_object='copied_sales/2017/january-backup.avro', + google_cloud_storage_conn_id=google_cloud_conn_id + ) + + The following Operator would copy all the Avro files from ``sales/sales-2017`` + folder (i.e. with names starting with that prefix) in ``data`` bucket to the + ``copied_sales/2017`` folder in the ``data_backup`` bucket. :: + copy_files = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id='copy_files', + source_bucket='data', + source_object='sales/sales-2017/*.avro', + destination_bucket='data_backup', + destination_object='copied_sales/2017/', + google_cloud_storage_conn_id=google_cloud_conn_id + ) + + The following Operator would move all the Avro files from ``sales/sales-2017`` + folder (i.e. with names starting with that prefix) in ``data`` bucket to the + same folder in the ``data_backup`` bucket, deleting the original files in the + process. :: + move_files = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id='move_files', + source_bucket='data', + source_object='sales/sales-2017/*.avro', + destination_bucket='data_backup', + move_object=True, + google_cloud_storage_conn_id=google_cloud_conn_id + ) """ template_fields = ('source_bucket', 'source_object', 'destination_bucket', 'destination_object',) @@ -73,8 +117,8 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): delegate_to=None, *args, **kwargs): - super(GoogleCloudStorageToGoogleCloudStorageOperator, self).__init__( - *args, **kwargs) + super(GoogleCloudStorageToGoogleCloudStorageOperator, + self).__init__(*args, **kwargs) self.source_bucket = source_bucket self.source_object = source_object self.destination_bucket = destination_bucket @@ -82,6 +126,7 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): self.move_object = move_object self.google_cloud_storage_conn_id = google_cloud_storage_conn_id self.delegate_to = delegate_to + self.wildcard = '*' def execute(self, context): @@ -89,24 +134,22 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to ) + log_message = 'Executing copy of gs://{0}/{1} to gs://{2}/{3}' - if '*' in self.source_object: - wildcard_position = self.source_object.index('*') - objects = hook.list(self.source_bucket, - prefix=self.source_object[:wildcard_position], - delimiter=self.source_object[wildcard_position + 1:]) + if self.wildcard in self.source_object: + prefix, delimiter = self.source_object.split(self.wildcard, 1) + objects = hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter) for source_object in objects: - if self.destination_object: - destination_object = "{}/{}".format(self.destination_object, - source_object[wildcard_position:]) - else: + if self.destination_object is None: destination_object = source_object - self.log.info('Executing copy of gs://{0}/{1} to ' - 'gs://{2}/{3}'.format(self.source_bucket, - source_object, - self.destination_bucket, - destination_object)) + else: + destination_object = source_object.replace(prefix, + self.destination_object, 1) + self.log.info( + log_message.format(self.source_bucket, source_object, + self.destination_bucket, destination_object) + ) hook.copy(self.source_bucket, source_object, self.destination_bucket, destination_object) @@ -115,13 +158,9 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): else: self.log.info( - 'Executing copy of gs://{0}/{1} to ' - 'gs://{2}/{3}'.format( - self.source_bucket, - self.source_object, - self.destination_bucket or self.source_bucket, - self.destination_object or self.source_object - ) + log_message.format(self.source_bucket, self.source_object, + self.destination_bucket or self.source_bucket, + self.destination_object or self.source_object) ) hook.copy(self.source_bucket, self.source_object, self.destination_bucket, self.destination_object) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0a09abe/tests/contrib/operators/test_gcs_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py b/tests/contrib/operators/test_gcs_to_gcs_operator.py index 3c46c49..6093d0f 100644 --- a/tests/contrib/operators/test_gcs_to_gcs_operator.py +++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py @@ -37,23 +37,13 @@ PREFIX = 'TEST' SOURCE_OBJECT_1 = '*test_object' SOURCE_OBJECT_2 = 'test_object*' SOURCE_OBJECT_3 = 'test*object' +SOURCE_OBJECT_4 = 'test_object*.txt' DESTINATION_BUCKET = 'archive' DESTINATION_OBJECT_PREFIX = 'foo/bar' SOURCE_FILES_LIST = [ 'test_object/file1.txt', 'test_object/file2.txt', - 'some_other/file.txt' -] -MOCK_CALLS = [ - mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET, - DESTINATION_OBJECT_PREFIX + '/' + file_path) - for file_path in SOURCE_FILES_LIST - if file_path.startswith(SOURCE_OBJECT_1) -] -MOCK_CALLS_EMPTY = [ - mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET, file_path) - for file_path in SOURCE_FILES_LIST - if file_path.startswith(SOURCE_OBJECT_1) + 'test_object/file3.json', ] @@ -103,37 +93,77 @@ class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): TEST_BUCKET, prefix="test", delimiter="object" ) + # copy with wildcard + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') - def test_execute_no_suffix_with_destination_object(self, mock_hook): + def test_execute_wildcard_with_destination_object(self, mock_hook): mock_hook.return_value.list.return_value = SOURCE_FILES_LIST operator = GoogleCloudStorageToGoogleCloudStorageOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, - source_object=SOURCE_OBJECT_2, + source_object=SOURCE_OBJECT_4, destination_bucket=DESTINATION_BUCKET, destination_object=DESTINATION_OBJECT_PREFIX) operator.execute(None) - mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS) + mock_calls = [ + mock.call(TEST_BUCKET, 'test_object/file1.txt', + DESTINATION_BUCKET, 'foo/bar/file1.txt'), + mock.call(TEST_BUCKET, 'test_object/file2.txt', + DESTINATION_BUCKET, 'foo/bar/file2.txt'), + ] + mock_hook.return_value.copy.assert_has_calls(mock_calls) @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') - def test_execute_no_suffix_without_destination_object(self, mock_hook): + def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hook): mock_hook.return_value.list.return_value = SOURCE_FILES_LIST operator = GoogleCloudStorageToGoogleCloudStorageOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, - source_object=SOURCE_OBJECT_2, + source_object=SOURCE_OBJECT_4, + destination_bucket=DESTINATION_BUCKET, + destination_object='{}/{}'.format(DESTINATION_OBJECT_PREFIX, + SOURCE_OBJECT_2[:-1]) + ) + + operator.execute(None) + mock_calls_retained = [ + mock.call(TEST_BUCKET, 'test_object/file1.txt', + DESTINATION_BUCKET, 'foo/bar/test_object/file1.txt'), + mock.call(TEST_BUCKET, 'test_object/file2.txt', + DESTINATION_BUCKET, 'foo/bar/test_object/file2.txt'), + ] + mock_hook.return_value.copy.assert_has_calls(mock_calls_retained) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') + def test_execute_wildcard_without_destination_object(self, mock_hook): + mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + operator = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_4, destination_bucket=DESTINATION_BUCKET) operator.execute(None) - mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY) + mock_calls_none = [ + mock.call(TEST_BUCKET, 'test_object/file1.txt', + DESTINATION_BUCKET, 'test_object/file1.txt'), + mock.call(TEST_BUCKET, 'test_object/file2.txt', + DESTINATION_BUCKET, 'test_object/file2.txt'), + ] + mock_hook.return_value.copy.assert_has_calls(mock_calls_none) @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') - def test_execute_no_suffix_empty_destination_object(self, mock_hook): + def test_execute_wildcard_empty_destination_object(self, mock_hook): mock_hook.return_value.list.return_value = SOURCE_FILES_LIST operator = GoogleCloudStorageToGoogleCloudStorageOperator( task_id=TASK_ID, source_bucket=TEST_BUCKET, - source_object=SOURCE_OBJECT_2, + source_object=SOURCE_OBJECT_4, destination_bucket=DESTINATION_BUCKET, destination_object='') operator.execute(None) - mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY) + mock_calls_empty = [ + mock.call(TEST_BUCKET, 'test_object/file1.txt', + DESTINATION_BUCKET, '/file1.txt'), + mock.call(TEST_BUCKET, 'test_object/file2.txt', + DESTINATION_BUCKET, '/file2.txt'), + ] + mock_hook.return_value.copy.assert_has_calls(mock_calls_empty)