Repository: incubator-airflow Updated Branches: refs/heads/master e95a1251b -> 17d3d1d9d
[AIRFLOW-2330] Do not append destination prefix if not given Closes #3233 from berislavlopac/AIRFLOW-2330 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/17d3d1d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/17d3d1d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/17d3d1d9 Branch: refs/heads/master Commit: 17d3d1d9dc87c0bbb03de049607c2ad76a4fd747 Parents: e95a125 Author: Berislav Lopac <[email protected]> Authored: Thu Apr 19 10:26:23 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Thu Apr 19 10:26:23 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/operators/gcs_to_gcs.py | 41 ++++++++------ .../operators/test_gcs_to_gcs_operator.py | 58 +++++++++++++++++++- 2 files changed, 81 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/17d3d1d9/airflow/contrib/operators/gcs_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_gcs.py b/airflow/contrib/operators/gcs_to_gcs.py index dc67ddc..6acc517 100644 --- a/airflow/contrib/operators/gcs_to_gcs.py +++ b/airflow/contrib/operators/gcs_to_gcs.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,7 +24,7 @@ from airflow.utils.decorators import apply_defaults class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): """ - Copies an object from a bucket to another, with renaming if requested. + Copies objects from a bucket to another, with renaming if requested. :param source_bucket: The source Google cloud storage bucket where the object is. :type source_bucket: string @@ -43,8 +43,7 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): destination Google cloud storage bucket. If a wildcard is supplied in the source_object argument, this is the - folder that the files will be - copied to in the destination bucket. + prefix that will be prepended to the final destination objects' paths. :type destination_object: string :param move_object: When move object is True, the object is moved instead of copied to the new location. @@ -96,24 +95,34 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): objects = hook.list(self.source_bucket, prefix=self.source_object[:wildcard_position], delimiter=self.source_object[wildcard_position + 1:]) + for source_object in objects: + if self.destination_object: + destination_object = "{}/{}".format(self.destination_object, + source_object) + else: + destination_object = source_object self.log.info('Executing copy of gs://{0}/{1} to ' - 'gs://{2}/{3}/{1}'.format(self.source_bucket, - source_object, - self.destination_bucket, - self.destination_object, - source_object)) + 'gs://{2}/{3}'.format(self.source_bucket, + source_object, + self.destination_bucket, + destination_object)) + hook.copy(self.source_bucket, source_object, - self.destination_bucket, "{}/{}".format(self.destination_object, - source_object)) + self.destination_bucket, destination_object) if self.move_object: hook.delete(self.source_bucket, source_object) else: - self.log.info('Executing copy: %s, %s, %s, %s', self.source_bucket, - self.source_object, - self.destination_bucket or self.source_bucket, - self.destination_object or self.source_object) + self.log.info( + 'Executing copy of gs://{0}/{1} to ' + 'gs://{2}/{3}'.format( + self.source_bucket, + self.source_object, + self.destination_bucket or self.source_bucket, + self.destination_object or self.source_object + ) + ) hook.copy(self.source_bucket, self.source_object, self.destination_bucket, self.destination_object) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/17d3d1d9/tests/contrib/operators/test_gcs_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py b/tests/contrib/operators/test_gcs_to_gcs_operator.py index ac9faf3..3c46c49 100644 --- a/tests/contrib/operators/test_gcs_to_gcs_operator.py +++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,6 +38,23 @@ SOURCE_OBJECT_1 = '*test_object' SOURCE_OBJECT_2 = 'test_object*' SOURCE_OBJECT_3 = 'test*object' DESTINATION_BUCKET = 'archive' +DESTINATION_OBJECT_PREFIX = 'foo/bar' +SOURCE_FILES_LIST = [ + 'test_object/file1.txt', + 'test_object/file2.txt', + 'some_other/file.txt' +] +MOCK_CALLS = [ + mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET, + DESTINATION_OBJECT_PREFIX + '/' + file_path) + for file_path in SOURCE_FILES_LIST + if file_path.startswith(SOURCE_OBJECT_1) +] +MOCK_CALLS_EMPTY = [ + mock.call(TEST_BUCKET, file_path, DESTINATION_BUCKET, file_path) + for file_path in SOURCE_FILES_LIST + if file_path.startswith(SOURCE_OBJECT_1) +] class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): @@ -46,6 +63,8 @@ class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): no_prefix: *test_object no_suffix: test_object* prefix_and_suffix: test*object + + Also tests the destionation_object as prefix when the wildcard is used. """ @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') @@ -83,3 +102,38 @@ class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): mock_hook.return_value.list.assert_called_once_with( TEST_BUCKET, prefix="test", delimiter="object" ) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') + def test_execute_no_suffix_with_destination_object(self, mock_hook): + mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + operator = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_2, + destination_bucket=DESTINATION_BUCKET, + destination_object=DESTINATION_OBJECT_PREFIX) + + operator.execute(None) + mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') + def test_execute_no_suffix_without_destination_object(self, mock_hook): + mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + operator = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_2, + destination_bucket=DESTINATION_BUCKET) + + operator.execute(None) + mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY) + + @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') + def test_execute_no_suffix_empty_destination_object(self, mock_hook): + mock_hook.return_value.list.return_value = SOURCE_FILES_LIST + operator = GoogleCloudStorageToGoogleCloudStorageOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_2, + destination_bucket=DESTINATION_BUCKET, + destination_object='') + + operator.execute(None) + mock_hook.return_value.copy.assert_has_calls(MOCK_CALLS_EMPTY)
