chamikaramj commented on a change in pull request #15301:
URL: https://github.com/apache/beam/pull/15301#discussion_r686189167



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
##########
@@ -770,12 +770,10 @@ final void moveToOutputFiles(
       }
       // During a failure case, files may have been deleted in an earlier 
step. Thus
       // we ignore missing files here.
-      FileSystems.rename(
-          srcFiles,
-          dstFiles,
-          StandardMoveOptions.IGNORE_MISSING_FILES,
-          StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
-      removeTemporaryFiles(srcFiles);

Review comment:
       I think we still need removeTemporaryFiles() to remove the temporary 
directory for batch.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
##########
@@ -770,12 +770,10 @@ final void moveToOutputFiles(
       }
       // During a failure case, files may have been deleted in an earlier 
step. Thus
       // we ignore missing files here.
-      FileSystems.rename(
-          srcFiles,
-          dstFiles,
-          StandardMoveOptions.IGNORE_MISSING_FILES,

Review comment:
       We should make sure that not passing these options do not end up being a 
regression for other file systems (S3, HDFS, local, etc.)

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
##########
@@ -317,6 +317,10 @@ public static void rename(
       return;
     }
 
+    // XXX: Instead of filtering here we should pass options on to filesystem 
as it will be able to

Review comment:
       This is a TODO ?

##########
File path: 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -681,21 +734,77 @@ public void onSuccess(RewriteResponse rewriteResponse, 
HttpHeaders responseHeade
 
     @Override
     public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
-      readyToEnqueue = false;
-      throw new IOException(String.format("Error trying to rewrite %s to %s: 
%s", from, to, e));
+      if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
+        if (move) {
+          // Treat the missing source as a successful move. We don't verify 
the destination file
+          // exists as it may have subsequently been moved by something else.
+          readyToEnqueue = false;
+          lastError = null;
+        } else {
+          throw new FileNotFoundException(from.toString());
+        }
+      } else {
+        lastError = e;
+        readyToEnqueue = true;
+      }
     }
   }
 
   public void copy(Iterable<String> srcFilenames, Iterable<String> 
destFilenames)
       throws IOException {
-    LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, 
destFilenames);
-    while (rewrites.size() > 0) {
-      executeBatches(makeCopyBatches(rewrites));
+    rewrite(srcFilenames, destFilenames, false);
+  }
+
+  public void rename(Iterable<String> srcFilenames, Iterable<String> 
destFilenames)
+      throws IOException {
+    rewrite(srcFilenames, destFilenames, true);
+  }
+
+  private void rewrite(Iterable<String> srcFilenames, Iterable<String> 
destFilenames, boolean move)
+      throws IOException {
+    LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, 
destFilenames, move);
+    org.apache.beam.sdk.util.BackOff backoff = BACKOFF_FACTORY.backoff();
+    while (true) {
+      List<BatchRequest> batches = makeCopyBatches(rewrites); // Removes 
completed rewrite ops.
+      if (batches.isEmpty()) {
+        break;
+      }
+      RewriteOp sampleErrorOp =
+          rewrites.stream().filter(op -> op.getLastError() != 
null).findFirst().orElse(null);

Review comment:
       Should we just skip 404 failures here ? If we ignore other errors and 
end up not copying some files it could lead to data loss.

##########
File path: 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -625,29 +625,76 @@ private static void executeBatches(List<BatchRequest> 
batches) throws IOExceptio
    * round of enqueue() and execute is required. Repeat until 
getReadyToEnqueue() returns false.
    */
   class RewriteOp extends JsonBatchCallback<RewriteResponse> {
-    private GcsPath from;
-    private GcsPath to;
+    private final GcsPath from;
+    private final GcsPath to;
+    private final boolean move;
     private boolean readyToEnqueue;
+    private boolean performDelete;
+    private GoogleJsonError lastError;
     @VisibleForTesting Storage.Objects.Rewrite rewriteRequest;
 
     public boolean getReadyToEnqueue() {
       return readyToEnqueue;
     }
 
+    public GoogleJsonError getLastError() {
+      return lastError;
+    }
+
+    public GcsPath getFrom() {
+      return from;
+    }
+
+    public GcsPath getTo() {
+      return to;
+    }
+
     public void enqueue(BatchRequest batch) throws IOException {
       if (!readyToEnqueue) {
         throw new IOException(
             String.format(
                 "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s",
                 from, to, readyToEnqueue));
       }
-      rewriteRequest.queue(batch, this);
-      readyToEnqueue = false;
+      LOG.info("XXX enqueue op");
+      if (performDelete) {
+        Storage.Objects.Delete deleteRequest =
+            storageClient.objects().delete(from.getBucket(), from.getObject());
+        deleteRequest.queue(
+            batch,
+            new JsonBatchCallback<Void>() {
+              @Override
+              public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+                LOG.debug("Successfully deleted {} after moving to {}", from, 
to);
+                readyToEnqueue = false;
+                lastError = null;
+              }
+
+              @Override
+              public void onFailure(GoogleJsonError e, HttpHeaders 
responseHeaders)
+                  throws IOException {
+                if (e.getCode() == 404) {
+                  LOG.info(
+                      "Ignoring failed deletion of moved file {} which already 
does not exist: {}",
+                      from,
+                      e);
+                  readyToEnqueue = false;
+                  lastError = null;
+                } else {
+                  readyToEnqueue = true;
+                  lastError = e;
+                }
+              }
+            });
+      } else {
+        rewriteRequest.queue(batch, this);
+      }
     }
 
-    public RewriteOp(GcsPath from, GcsPath to) throws IOException {
+    public RewriteOp(GcsPath from, GcsPath to, boolean move) throws 
IOException {

Review comment:
       I believe rewrite means copy, so this option muddles the semantics a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to