shunping commented on code in PR #37592:
URL: https://github.com/apache/beam/pull/37592#discussion_r2823234875


##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -259,6 +273,144 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws 
IOException {
     return results;
   }
 
+  public enum MissingStrategy {
+    FAIL_IF_MISSING,
+    SKIP_IF_MISSING,
+  }
+
+  public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws 
IOException {
+    for (List<GcsPath> pathPartition :
+        Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+
+      // Create a new empty batch every time
+      StorageBatch batch = storage.batch();
+      List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();
+
+      for (GcsPath path : pathPartition) {
+        batchResultFutures.add(batch.delete(path.getBucket(), 
path.getObject()));
+      }
+      batch.submit();
+
+      for (int i = 0; i < batchResultFutures.size(); i++) {
+        StorageBatchResult<Boolean> future = batchResultFutures.get(i);
+        try {
+          Boolean deleted = future.get();
+          if (!deleted) {
+            if (strategy == MissingStrategy.FAIL_IF_MISSING) {
+              throw new FileNotFoundException(
+                  String.format(
+                      "The specified file does not exist: %s", 
pathPartition.get(i).toString()));
+            } else {
+              LOG.warn("Ignoring failed deletion on file {}.", 
pathPartition.get(i).toString());
+            }
+          }
+        } catch (StorageException e) {
+          throw translateStorageException(pathPartition.get(i), e);
+        }
+      }
+    }
+  }
+
+  public enum OverwriteStrategy {
+    FAIL_IF_EXISTS, // Fail if target exists
+    SKIP_IF_EXISTS, // Skip if target exists
+    SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
+    ALWAYS_OVERWRITE // Overwrite regardless of state
+  }
+
+  private void rewriteHelper(
+      Iterable<GcsPath> srcPaths,
+      Iterable<GcsPath> dstPaths,
+      boolean deleteSrc,
+      MissingStrategy srcMissing,
+      OverwriteStrategy dstOverwrite)
+      throws IOException {
+    List<GcsPath> srcList = Lists.newArrayList(srcPaths);
+    List<GcsPath> dstList = Lists.newArrayList(dstPaths);
+    checkArgument(
+        srcList.size() == dstList.size(),
+        "Number of source files %s must equal number of destination files %s",
+        srcList.size(),
+        dstList.size());
+
+    for (int i = 0; i < srcList.size(); i++) {
+      GcsPath srcPath = srcList.get(i);
+      GcsPath dstPath = dstList.get(i);
+      BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
+      BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());
+
+      CopyRequest.Builder copyRequestBuilder =
+          CopyRequest.newBuilder()
+              .setSource(srcId)
+              .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);
+
+      if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
+        copyRequestBuilder.setTarget(dstId);
+      } else {
+        // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking 
the target blob
+        BlobInfo existingTarget;
+        try {
+          existingTarget = storage.get(dstId);
+        } catch (StorageException e) {
+          throw translateStorageException(dstPath, e);
+        }
+
+        if (existingTarget == null) {
+          copyRequestBuilder.setTarget(dstId, 
Storage.BlobTargetOption.doesNotExist());
+        } else {
+          switch (dstOverwrite) {
+            case SKIP_IF_EXISTS:
+              LOG.warn("Ignoring rewriting from {} to {} because target 
exists.", srcPath, dstPath);
+              continue; // Skip to next file in for-loop
+
+            case SAFE_OVERWRITE:
+              copyRequestBuilder.setTarget(
+                  dstId, 
Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
+              break;
+
+            case FAIL_IF_EXISTS:
+            default:
+              throw new FileAlreadyExistsException(
+                  srcPath.toString(),
+                  dstPath.toString(),
+                  "Target object already exists and strategy is 
FAIL_IF_EXISTS");
+          }

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to