[
https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=174358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174358
]
ASF GitHub Bot logged work on BEAM-5959:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Dec/18 00:17
Start Date: 12/Dec/18 00:17
Worklog Time Spent: 10m
Work Description: chamikaramj commented on a change in pull request
#7050: [BEAM-5959] Reimplement GCS copies with rewrites.
URL: https://github.com/apache/beam/pull/7050#discussion_r240837808
##########
File path: sdks/python/apache_beam/io/gcp/gcsio.py
##########
@@ -275,63 +285,128 @@ def delete_batch(self, paths):
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
- def copy(self, src, dest):
+ def copy(self, src, dest, dest_kms_key_name=None, timeout_secs=3600,
+ max_bytes_rewritten_per_call=None):
"""Copies the given GCS object from src to dest.
Args:
src: GCS file path pattern in the form gs://<bucket>/<name>.
dest: GCS file path pattern in the form gs://<bucket>/<name>.
+ dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+ Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+ encryption defaults.
+ timeout_secs: Experimental. No backwards compatibility guarantees. Wait
+ about this long for the operation to complete.
+ max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+ guarantees. Each rewrite API call will return after these many bytes.
+ Used for testing.
+
+ Raises:
+ TimeoutError on timeout.
"""
src_bucket, src_path = parse_gcs_path(src)
dest_bucket, dest_path = parse_gcs_path(dest)
- request = storage.StorageObjectsCopyRequest(
+ request = storage.StorageObjectsRewriteRequest(
sourceBucket=src_bucket,
sourceObject=src_path,
destinationBucket=dest_bucket,
- destinationObject=dest_path)
- self.client.objects.Copy(request)
+ destinationObject=dest_path,
+ destinationKmsKeyName=dest_kms_key_name,
+ maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+ start_time = time.time()
+ response = self.client.objects.Rewrite(request)
+ while not response.done:
+ logging.debug(
+ 'Rewrite progress: %d / %d bytes, %s to %s',
+ response.totalBytesRewritten, response.objectSize, src, dest)
+ elapsed = time.time() - start_time
+ if elapsed > timeout_secs:
+ raise TimeoutError(
+ 'Aborting rewrite after %d seconds: %s to %s' % (
+ elapsed, src, dest))
+ request.rewriteToken = response.rewriteToken
+ response = self.client.objects.Rewrite(request)
+ if self._rewrite_cb is not None:
+ self._rewrite_cb(response)
+
+ logging.debug('Rewrite done: %s to %s', src, dest)
# We intentionally do not decorate this method with a retry, as retrying is
# handled in BatchApiRequest.Execute().
- def copy_batch(self, src_dest_pairs):
+ def copy_batch(self, src_dest_pairs, dest_kms_key_name=None,
+ timeout_secs=3600, max_bytes_rewritten_per_call=None):
Review comment:
Are we sure that 3600 sec will be enough for existing pipelines ? I would
not be surprised if some users have pipelines that will result in this being
exceeded. Can we keep the current default ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 174358)
Time Spent: 3h 50m (was: 3h 40m)
> Add Cloud KMS support to GCS copies
> -----------------------------------
>
> Key: BEAM-5959
> URL: https://issues.apache.org/jira/browse/BEAM-5959
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp, sdk-py-core
> Reporter: Udi Meiri
> Assignee: Udi Meiri
> Priority: Major
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> Beam SDK currently uses the CopyTo GCS API call, which doesn't support
> copying objects that Customer Managed Encryption Keys (CMEK).
> CMEKs are managed in Cloud KMS.
> Items (for Java and Python SDKs):
> - Update clients to versions that support KMS keys.
> - Change copyTo API calls to use rewriteTo (Python - directly, Java -
> possibly convert copyTo API call to use client library)
> - Add unit tests.
> - Add basic tests (DirectRunner and GCS buckets with CMEK).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)