scwhittle commented on a change in pull request #15301:
URL: https://github.com/apache/beam/pull/15301#discussion_r686250554



##########
File path: 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -681,21 +734,77 @@ public void onSuccess(RewriteResponse rewriteResponse, 
HttpHeaders responseHeade
 
     @Override
     public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
-      readyToEnqueue = false;
-      throw new IOException(String.format("Error trying to rewrite %s to %s: 
%s", from, to, e));
+      if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
+        if (move) {
+          // Treat the missing source as a successful move. We don't verify 
the destination file
+          // exists as it may have subsequently been moved by something else.
+          readyToEnqueue = false;
+          lastError = null;
+        } else {
+          throw new FileNotFoundException(from.toString());
+        }
+      } else {
+        lastError = e;
+        readyToEnqueue = true;
+      }
     }
   }
 
   public void copy(Iterable<String> srcFilenames, Iterable<String> 
destFilenames)
       throws IOException {
-    LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, 
destFilenames);
-    while (rewrites.size() > 0) {
-      executeBatches(makeCopyBatches(rewrites));
+    rewrite(srcFilenames, destFilenames, false);
+  }
+
+  public void rename(Iterable<String> srcFilenames, Iterable<String> 
destFilenames)
+      throws IOException {
+    rewrite(srcFilenames, destFilenames, true);
+  }
+
+  private void rewrite(Iterable<String> srcFilenames, Iterable<String> 
destFilenames, boolean move)
+      throws IOException {
+    LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, 
destFilenames, move);
+    org.apache.beam.sdk.util.BackOff backoff = BACKOFF_FACTORY.backoff();
+    while (true) {
+      List<BatchRequest> batches = makeCopyBatches(rewrites); // Removes 
completed rewrite ops.
+      if (batches.isEmpty()) {
+        break;
+      }
+      RewriteOp sampleErrorOp =
+          rewrites.stream().filter(op -> op.getLastError() != 
null).findFirst().orElse(null);

Review comment:
       we skip the 404 errors in the per-op handling above, so the error here 
is an error we want to retry
   
   we are not skipping, we either retry or once we hit the backoff limit we 
propagate one of the errors




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to