scwhittle commented on a change in pull request #15301:
URL: https://github.com/apache/beam/pull/15301#discussion_r686811469



##########
File path: 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -625,29 +625,76 @@ private static void executeBatches(List<BatchRequest> 
batches) throws IOExceptio
    * round of enqueue() and execute is required. Repeat until 
getReadyToEnqueue() returns false.
    */
   class RewriteOp extends JsonBatchCallback<RewriteResponse> {
-    private GcsPath from;
-    private GcsPath to;
+    private final GcsPath from;
+    private final GcsPath to;
+    private final boolean move;
     private boolean readyToEnqueue;
+    private boolean performDelete;
+    private GoogleJsonError lastError;
     @VisibleForTesting Storage.Objects.Rewrite rewriteRequest;
 
     public boolean getReadyToEnqueue() {
       return readyToEnqueue;
     }
 
+    public GoogleJsonError getLastError() {
+      return lastError;
+    }
+
+    public GcsPath getFrom() {
+      return from;
+    }
+
+    public GcsPath getTo() {
+      return to;
+    }
+
     public void enqueue(BatchRequest batch) throws IOException {
       if (!readyToEnqueue) {
         throw new IOException(
             String.format(
                 "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s",
                 from, to, readyToEnqueue));
       }
-      rewriteRequest.queue(batch, this);
-      readyToEnqueue = false;
+      LOG.info("XXX enqueue op");
+      if (performDelete) {
+        Storage.Objects.Delete deleteRequest =
+            storageClient.objects().delete(from.getBucket(), from.getObject());
+        deleteRequest.queue(
+            batch,
+            new JsonBatchCallback<Void>() {
+              @Override
+              public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+                LOG.debug("Successfully deleted {} after moving to {}", from, 
to);
+                readyToEnqueue = false;
+                lastError = null;
+              }
+
+              @Override
+              public void onFailure(GoogleJsonError e, HttpHeaders 
responseHeaders)
+                  throws IOException {
+                if (e.getCode() == 404) {
+                  LOG.info(
+                      "Ignoring failed deletion of moved file {} which already 
does not exist: {}",
+                      from,
+                      e);
+                  readyToEnqueue = false;
+                  lastError = null;
+                } else {
+                  readyToEnqueue = true;
+                  lastError = e;
+                }
+              }
+            });
+      } else {
+        rewriteRequest.queue(batch, this);
+      }
     }
 
-    public RewriteOp(GcsPath from, GcsPath to) throws IOException {
+    public RewriteOp(GcsPath from, GcsPath to, boolean move) throws 
IOException {

Review comment:
       renamed the option to make what it does clearer. It is convenient to 
have a single op shared by copy and rename.  Let me know if you have other 
naming ideas.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to