[ 
https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=174729&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174729
 ]

ASF GitHub Bot logged work on BEAM-5959:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Dec/18 02:21
            Start Date: 13/Dec/18 02:21
    Worklog Time Spent: 10m 
      Work Description: udim commented on a change in pull request #7050: 
[BEAM-5959] Reimplement GCS copies with rewrites.
URL: https://github.com/apache/beam/pull/7050#discussion_r241240028
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/gcsio.py
 ##########
 @@ -275,63 +285,128 @@ def delete_batch(self, paths):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def copy(self, src, dest):
+  def copy(self, src, dest, dest_kms_key_name=None, timeout_secs=3600,
+           max_bytes_rewritten_per_call=None):
     """Copies the given GCS object from src to dest.
 
     Args:
       src: GCS file path pattern in the form gs://<bucket>/<name>.
       dest: GCS file path pattern in the form gs://<bucket>/<name>.
+      dest_kms_key_name: Experimental. No backwards compatibility guarantees.
+        Encrypt dest with this Cloud KMS key. If None, will use dest bucket
+        encryption defaults.
+      timeout_secs: Experimental. No backwards compatibility guarantees. Wait
+        about this long for the operation to complete.
+      max_bytes_rewritten_per_call: Experimental. No backwards compatibility
+        guarantees. Each rewrite API call will return after these many bytes.
+        Used for testing.
+
+    Raises:
+      TimeoutError on timeout.
     """
     src_bucket, src_path = parse_gcs_path(src)
     dest_bucket, dest_path = parse_gcs_path(dest)
-    request = storage.StorageObjectsCopyRequest(
+    request = storage.StorageObjectsRewriteRequest(
         sourceBucket=src_bucket,
         sourceObject=src_path,
         destinationBucket=dest_bucket,
-        destinationObject=dest_path)
-    self.client.objects.Copy(request)
+        destinationObject=dest_path,
+        destinationKmsKeyName=dest_kms_key_name,
+        maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
+    start_time = time.time()
+    response = self.client.objects.Rewrite(request)
 
 Review comment:
   It depends how you look at it. The docs for response say: "true if the copy 
is finished; otherwise, false if the copy is in progress". 'In progress' sounds 
like it's an asynchronous operation. From the caller's perspective, Rewrite() 
is a blocking operation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174729)

> Add Cloud KMS support to GCS copies
> -----------------------------------
>
>                 Key: BEAM-5959
>                 URL: https://issues.apache.org/jira/browse/BEAM-5959
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp, sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Beam SDK currently uses the CopyTo GCS API call, which doesn't support 
> copying objects that Customer Managed Encryption Keys (CMEK).
> CMEKs are managed in Cloud KMS.
> Items (for Java and Python SDKs):
> - Update clients to versions that support KMS keys.
> - Change copyTo API calls to use rewriteTo (Python - directly, Java - 
> possibly convert copyTo API call to use client library)
> - Add unit tests.
> - Add basic tests (DirectRunner and GCS buckets with CMEK).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to