Repository: incubator-airflow Updated Branches: refs/heads/master df05546f8 -> ac5954d7f
[AIRFLOW-2575] Make gcs to gcs operator work with large files Use `GoogleCloudStorageHook.rewrite` instead of `copy` so that it works with files > 5TB Closes #3472 from torkjel/AIRFLOW-2575-gcs-to-gcs- operator-support-large-files Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ac5954d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ac5954d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ac5954d7 Branch: refs/heads/master Commit: ac5954d7f00a27ed41466b53f92cd37db426e38b Parents: df05546 Author: Torkjel Hongve <[email protected]> Authored: Fri Jun 8 16:55:44 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Fri Jun 8 16:55:44 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/gcs_to_gcs.py | 8 ++++---- tests/contrib/operators/test_gcs_to_gcs_operator.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac5954d7/airflow/contrib/operators/gcs_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_gcs.py b/airflow/contrib/operators/gcs_to_gcs.py index e6d327b..256685f 100644 --- a/airflow/contrib/operators/gcs_to_gcs.py +++ b/airflow/contrib/operators/gcs_to_gcs.py @@ -151,8 +151,8 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): self.destination_bucket, destination_object) ) - hook.copy(self.source_bucket, source_object, - self.destination_bucket, destination_object) + hook.rewrite(self.source_bucket, source_object, + self.destination_bucket, destination_object) if self.move_object: hook.delete(self.source_bucket, source_object) @@ -162,8 +162,8 @@ class GoogleCloudStorageToGoogleCloudStorageOperator(BaseOperator): self.destination_bucket or self.source_bucket, self.destination_object or self.source_object) ) - hook.copy(self.source_bucket, self.source_object, - self.destination_bucket, self.destination_object) + hook.rewrite(self.source_bucket, self.source_object, + self.destination_bucket, self.destination_object) if self.move_object: hook.delete(self.source_bucket, self.source_object) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac5954d7/tests/contrib/operators/test_gcs_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_gcs_to_gcs_operator.py b/tests/contrib/operators/test_gcs_to_gcs_operator.py index 6093d0f..6b866d1 100644 --- a/tests/contrib/operators/test_gcs_to_gcs_operator.py +++ b/tests/contrib/operators/test_gcs_to_gcs_operator.py @@ -111,7 +111,7 @@ class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): mock.call(TEST_BUCKET, 'test_object/file2.txt', DESTINATION_BUCKET, 'foo/bar/file2.txt'), ] - mock_hook.return_value.copy.assert_has_calls(mock_calls) + mock_hook.return_value.rewrite.assert_has_calls(mock_calls) @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') def test_execute_wildcard_with_destination_object_retained_prefix(self, mock_hook): @@ -131,7 +131,7 @@ class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): mock.call(TEST_BUCKET, 'test_object/file2.txt', DESTINATION_BUCKET, 'foo/bar/test_object/file2.txt'), ] - mock_hook.return_value.copy.assert_has_calls(mock_calls_retained) + mock_hook.return_value.rewrite.assert_has_calls(mock_calls_retained) @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') def test_execute_wildcard_without_destination_object(self, mock_hook): @@ -148,7 +148,7 @@ class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): mock.call(TEST_BUCKET, 'test_object/file2.txt', DESTINATION_BUCKET, 'test_object/file2.txt'), ] - mock_hook.return_value.copy.assert_has_calls(mock_calls_none) + mock_hook.return_value.rewrite.assert_has_calls(mock_calls_none) @mock.patch('airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageHook') def test_execute_wildcard_empty_destination_object(self, mock_hook): @@ -166,4 +166,4 @@ class GoogleCloudStorageToCloudStorageOperatorTest(unittest.TestCase): mock.call(TEST_BUCKET, 'test_object/file2.txt', DESTINATION_BUCKET, '/file2.txt'), ] - mock_hook.return_value.copy.assert_has_calls(mock_calls_empty) + mock_hook.return_value.rewrite.assert_has_calls(mock_calls_empty)
