[
https://issues.apache.org/jira/browse/AIRFLOW-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675270#comment-16675270
]
ASF GitHub Bot commented on AIRFLOW-3205:
-----------------------------------------
Fokko closed pull request #4084: [AIRFLOW-3205] Support multipart uploads to GCS
URL: https://github.com/apache/incubator-airflow/pull/4084
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/hooks/gcs_hook.py
b/airflow/contrib/hooks/gcs_hook.py
index 1ece4dde1b..dc92b0cb3e 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -177,7 +177,8 @@ def download(self, bucket, object, filename=None):
# pylint:disable=redefined-builtin
def upload(self, bucket, object, filename,
- mime_type='application/octet-stream', gzip=False):
+ mime_type='application/octet-stream', gzip=False,
+ multipart=False, num_retries=0):
"""
Uploads a local file to Google Cloud Storage.
@@ -191,6 +192,14 @@ def upload(self, bucket, object, filename,
:type mime_type: str
:param gzip: Option to compress file for upload
:type gzip: bool
+ :param multipart: If True, the upload will be split into multiple HTTP
requests. The
+ default size is 256MiB per request. Pass a number
instead of True to
+ specify the request size, which must be a multiple
of 262144 (256KiB).
+ :type multipart: bool or int
+ :param num_retries: The number of times to attempt to re-upload the
file (or individual
+ chunks, in the case of multipart uploads). Retries
are attempted
+ with exponential backoff.
+ :type num_retries: int
"""
service = self.get_conn()
@@ -202,23 +211,45 @@ def upload(self, bucket, object, filename,
shutil.copyfileobj(f_in, f_out)
filename = filename_gz
- media = MediaFileUpload(filename, mime_type)
-
try:
- service \
- .objects() \
- .insert(bucket=bucket, name=object, media_body=media) \
- .execute()
+ if multipart:
+ if multipart is True:
+ chunksize = 256 * 1024 * 1024
+ else:
+ chunksize = multipart
+
+ if chunksize % (256 * 1024) > 0 or chunksize < 0:
+ raise ValueError("Multipart size is not a multiple of
262144 (256KiB)")
+
+ media = MediaFileUpload(filename, mimetype=mime_type,
+ chunksize=chunksize, resumable=True)
+
+ request = service.objects().insert(bucket=bucket, name=object,
media_body=media)
+ response = None
+ while response is None:
+ status, response =
request.next_chunk(num_retries=num_retries)
+ if status:
+ self.log.info("Upload progress %.1f%%",
status.progress() * 100)
+
+ else:
+ media = MediaFileUpload(filename, mime_type)
+
+ service \
+ .objects() \
+ .insert(bucket=bucket, name=object, media_body=media) \
+ .execute(num_retries=num_retries)
- # Clean up gzip file
- if gzip:
- os.remove(filename)
- return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
raise
+ finally:
+ if gzip:
+ os.remove(filename)
+
+ return True
+
# pylint:disable=redefined-builtin
def exists(self, bucket, object):
"""
diff --git a/tests/contrib/hooks/test_gcs_hook.py
b/tests/contrib/hooks/test_gcs_hook.py
index ed3dce9a5e..eea79a376c 100644
--- a/tests/contrib/hooks/test_gcs_hook.py
+++ b/tests/contrib/hooks/test_gcs_hook.py
@@ -18,6 +18,8 @@
# under the License.
import unittest
+import tempfile
+import os
from airflow.contrib.hooks import gcs_hook
from airflow.exceptions import AirflowException
@@ -339,3 +341,143 @@ def test_delete_nonexisting_object(self, mock_service):
response = self.gcs_hook.delete(bucket=test_bucket, object=test_object)
self.assertFalse(response)
+
+
+class TestGoogleCloudStorageHookUpload(unittest.TestCase):
+ def setUp(self):
+ with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__')):
+ self.gcs_hook = gcs_hook.GoogleCloudStorageHook(
+ google_cloud_storage_conn_id='test'
+ )
+
+ # generate a 384KiB test file (larger than the minimum 256KiB
multipart chunk size)
+ self.testfile = tempfile.NamedTemporaryFile(delete=False)
+ self.testfile.write(b"x" * 393216)
+ self.testfile.flush()
+
+ def tearDown(self):
+ os.unlink(self.testfile.name)
+
+ @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn'))
+ def test_upload(self, mock_service):
+ test_bucket = 'test_bucket'
+ test_object = 'test_object'
+
+ (mock_service.return_value.objects.return_value
+ .insert.return_value.execute.return_value) = {
+ "kind": "storage#object",
+ "id": "{}/{}/0123456789012345".format(test_bucket, test_object),
+ "name": test_object,
+ "bucket": test_bucket,
+ "generation": "0123456789012345",
+ "contentType": "application/octet-stream",
+ "timeCreated": "2018-03-15T16:51:02.502Z",
+ "updated": "2018-03-15T16:51:02.502Z",
+ "storageClass": "MULTI_REGIONAL",
+ "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z",
+ "size": "393216",
+ "md5Hash": "leYUJBUWrRtks1UeUFONJQ==",
+ "crc32c": "xgdNfQ==",
+ "etag": "CLf4hODk7tkCEAE="
+ }
+
+ response = self.gcs_hook.upload(test_bucket,
+ test_object,
+ self.testfile.name)
+
+ self.assertTrue(response)
+
+ @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn'))
+ def test_upload_gzip(self, mock_service):
+ test_bucket = 'test_bucket'
+ test_object = 'test_object'
+
+ (mock_service.return_value.objects.return_value
+ .insert.return_value.execute.return_value) = {
+ "kind": "storage#object",
+ "id": "{}/{}/0123456789012345".format(test_bucket, test_object),
+ "name": test_object,
+ "bucket": test_bucket,
+ "generation": "0123456789012345",
+ "contentType": "application/octet-stream",
+ "timeCreated": "2018-03-15T16:51:02.502Z",
+ "updated": "2018-03-15T16:51:02.502Z",
+ "storageClass": "MULTI_REGIONAL",
+ "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z",
+ "size": "393216",
+ "md5Hash": "leYUJBUWrRtks1UeUFONJQ==",
+ "crc32c": "xgdNfQ==",
+ "etag": "CLf4hODk7tkCEAE="
+ }
+
+ response = self.gcs_hook.upload(test_bucket,
+ test_object,
+ self.testfile.name,
+ gzip=True)
+ self.assertFalse(os.path.exists(self.testfile.name + '.gz'))
+ self.assertTrue(response)
+
+ @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn'))
+ def test_upload_gzip_error(self, mock_service):
+ test_bucket = 'test_bucket'
+ test_object = 'test_object'
+
+ (mock_service.return_value.objects.return_value
+ .insert.return_value.execute.side_effect) = HttpError(
+ resp={'status': '404'}, content=EMPTY_CONTENT)
+
+ response = self.gcs_hook.upload(test_bucket,
+ test_object,
+ self.testfile.name,
+ gzip=True)
+ self.assertFalse(os.path.exists(self.testfile.name + '.gz'))
+ self.assertFalse(response)
+
+ @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn'))
+ def test_upload_multipart(self, mock_service):
+ test_bucket = 'test_bucket'
+ test_object = 'test_object'
+
+ class MockProgress:
+ def __init__(self, value):
+ self.value = value
+
+ def progress(self):
+ return self.value
+
+ (mock_service.return_value.objects.return_value
+ .insert.return_value.next_chunk.side_effect) = [
+ (MockProgress(0.66), None),
+ (MockProgress(1.0), {
+ "kind": "storage#object",
+ "id": "{}/{}/0123456789012345".format(test_bucket,
test_object),
+ "name": test_object,
+ "bucket": test_bucket,
+ "generation": "0123456789012345",
+ "contentType": "application/octet-stream",
+ "timeCreated": "2018-03-15T16:51:02.502Z",
+ "updated": "2018-03-15T16:51:02.502Z",
+ "storageClass": "MULTI_REGIONAL",
+ "timeStorageClassUpdated": "2018-03-15T16:51:02.502Z",
+ "size": "393216",
+ "md5Hash": "leYUJBUWrRtks1UeUFONJQ==",
+ "crc32c": "xgdNfQ==",
+ "etag": "CLf4hODk7tkCEAE="
+ })
+ ]
+
+ response = self.gcs_hook.upload(test_bucket,
+ test_object,
+ self.testfile.name,
+ multipart=True)
+
+ self.assertTrue(response)
+
+ @mock.patch(GCS_STRING.format('GoogleCloudStorageHook.get_conn'))
+ def test_upload_multipart_wrong_chunksize(self, mock_service):
+ test_bucket = 'test_bucket'
+ test_object = 'test_object'
+
+ with self.assertRaises(ValueError):
+ self.gcs_hook.upload(test_bucket, test_object,
+ self.testfile.name, multipart=123)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> GCS: Support multipart upload
> -----------------------------
>
> Key: AIRFLOW-3205
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3205
> Project: Apache Airflow
> Issue Type: Improvement
> Components: gcp
> Reporter: Gordon Ball
> Priority: Minor
> Fix For: 2.0.0
>
>
> GoogleCloudStorageHook currently only provides support for uploading files in
> a single HTTP request. This means that loads fail with SSL errors for files
> larger than 2GiB (presumably a int32 overflow, might depend on which SSL
> library is being used). Multipart uploads should be supported to allow large
> uploads, and possibly increase reliability for smaller uploads.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)