This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 74fc46c404d GCS client library migration in Java SDK - part 2b (#37592)
74fc46c404d is described below

commit 74fc46c404d0e76d89f01d144d4038ba94dd5677
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Feb 18 15:26:33 2026 -0500

    GCS client library migration in Java SDK - part 2b (#37592)
    
    * Add copy() and remove() for GcsUtil V2.
    
    * Add tests and modify copy and remove to take strategies
    
    * Add deprecated annotations to V1 copy and remove.
    
    * Refactor MissingStrategy and OverwriteStrategy enums. Add rewriteHelper() 
and move().
    
    * Add rename tests and refacor copy and remove tests.
    
    * Refactor rename
    
    * Add experimental annotations to the new copy, remove and rename
    
    * Remove unused import.
    
    * Fix style.
    
    * Trigger post commit java for the integration tests of GcsUtil.
    
    * Revise according to reviews.
---
 .github/trigger_files/beam_PostCommit_Java.json    |   2 +-
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      |  72 +++++
 .../beam/sdk/extensions/gcp/util/GcsUtilV2.java    | 161 +++++++++++-
 .../gcp/util/GcsUtilParameterizedIT.java           | 290 +++++++++++++++++++++
 .../sdk/extensions/gcp/util/gcsfs/GcsPathTest.java |   2 +
 5 files changed, 525 insertions(+), 2 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Java.json 
b/.github/trigger_files/beam_PostCommit_Java.json
index 756b765e59e..7b4c1ba6702 100644
--- a/.github/trigger_files/beam_PostCommit_Java.json
+++ b/.github/trigger_files/beam_PostCommit_Java.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 4
+  "modification": 6
 } 
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index a2fdf24e9fb..33399ef87b6 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -37,16 +37,21 @@ import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 public class GcsUtil {
@@ -433,12 +438,65 @@ public class GcsUtil {
     delegate.copy(srcFilenames, destFilenames);
   }
 
+  /** experimental api. */
+  public void copyV2(Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths) 
throws IOException {
+    copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE);
+  }
+
+  /** experimental api. */
+  public void copy(
+      Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, 
OverwriteStrategy strategy)
+      throws IOException {
+    if (delegateV2 != null) {
+      delegateV2.copy(srcPaths, dstPaths, strategy);
+    } else {
+      throw new IOException("GcsUtil V2 not initialized.");
+    }
+  }
+
   public void rename(
       Iterable<String> srcFilenames, Iterable<String> destFilenames, 
MoveOptions... moveOptions)
       throws IOException {
     delegate.rename(srcFilenames, destFilenames, moveOptions);
   }
 
+  /** experimental api. */
+  public void renameV2(
+      Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, MoveOptions... 
moveOptions)
+      throws IOException {
+    Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
+    final MissingStrategy srcMissing;
+    final OverwriteStrategy dstOverwrite;
+
+    if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) {
+      srcMissing = MissingStrategy.SKIP_IF_MISSING;
+    } else {
+      srcMissing = MissingStrategy.FAIL_IF_MISSING;
+    }
+
+    if 
(moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) {
+      dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS;
+    } else {
+      dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE;
+    }
+
+    rename(srcPaths, dstPaths, srcMissing, dstOverwrite);
+  }
+
+  /** experimental api. */
+  public void rename(
+      Iterable<GcsPath> srcPaths,
+      Iterable<GcsPath> dstPaths,
+      MissingStrategy srcMissing,
+      OverwriteStrategy dstOverwrite)
+      throws IOException {
+    if (delegateV2 != null) {
+      delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite);
+    } else {
+      throw new IOException("GcsUtil V2 not initialized.");
+    }
+  }
+
   @VisibleForTesting
   @SuppressWarnings("JdkObsolete") // for LinkedList
   java.util.LinkedList<GcsUtilV1.RewriteOp> makeRewriteOps(
@@ -469,6 +527,20 @@ public class GcsUtil {
     delegate.remove(filenames);
   }
 
+  /** experimental api. */
+  public void removeV2(Iterable<GcsPath> paths) throws IOException {
+    remove(paths, MissingStrategy.SKIP_IF_MISSING);
+  }
+
+  /** experimental api. */
+  public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws 
IOException {
+    if (delegateV2 != null) {
+      delegateV2.remove(paths, strategy);
+    } else {
+      throw new IOException("GcsUtil V2 not initialized.");
+    }
+  }
+
   @SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION")
   public static class StorageObjectOrIOException {
     final GcsUtilV1.StorageObjectOrIOException delegate;
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
index a2df45511c9..b00b7ce0d72 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
@@ -18,19 +18,24 @@
 package org.apache.beam.sdk.extensions.gcp.util;
 
 import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.gax.paging.Page;
 import com.google.auto.value.AutoValue;
 import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Bucket;
 import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.CopyWriter;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.Storage.BlobField;
 import com.google.cloud.storage.Storage.BlobGetOption;
 import com.google.cloud.storage.Storage.BlobListOption;
 import com.google.cloud.storage.Storage.BucketField;
 import com.google.cloud.storage.Storage.BucketGetOption;
+import com.google.cloud.storage.Storage.CopyRequest;
 import com.google.cloud.storage.StorageBatch;
 import com.google.cloud.storage.StorageBatchResult;
 import com.google.cloud.storage.StorageException;
@@ -71,18 +76,27 @@ class GcsUtilV2 {
   /** Maximum number of requests permitted in a GCS batch request. */
   private static final int MAX_REQUESTS_PER_BATCH = 100;
 
+  /**
+   * Limit the number of bytes Cloud Storage will attempt to copy before 
responding to an individual
+   * request. If you see Read Timeout errors, try reducing this value.
+   */
+  private static final long MEGABYTES_COPIED_PER_CHUNK = 2048L;
+
   GcsUtilV2(PipelineOptions options) {
     String projectId = options.as(GcpOptions.class).getProject();
     storage = 
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
   }
 
   @SuppressWarnings({
-    "nullness" // For Creating AccessDeniedException and 
FileAlreadyExistsException with null.
+    "nullness" // For Creating AccessDeniedException FileNotFoundException, and
+    // FileAlreadyExistsException with null.
   })
   private IOException translateStorageException(GcsPath gcsPath, 
StorageException e) {
     switch (e.getCode()) {
       case 403:
         return new AccessDeniedException(gcsPath.toString(), null, 
e.getMessage());
+      case 404:
+        return new FileNotFoundException(e.getMessage());
       case 409:
         return new FileAlreadyExistsException(gcsPath.toString(), null, 
e.getMessage());
       default:
@@ -259,6 +273,151 @@ class GcsUtilV2 {
     return results;
   }
 
+  public enum MissingStrategy {
+    FAIL_IF_MISSING,
+    SKIP_IF_MISSING,
+  }
+
+  public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws 
IOException {
+    for (List<GcsPath> pathPartition :
+        Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+
+      // Create a new empty batch every time
+      StorageBatch batch = storage.batch();
+      List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();
+
+      for (GcsPath path : pathPartition) {
+        batchResultFutures.add(batch.delete(path.getBucket(), 
path.getObject()));
+      }
+      batch.submit();
+
+      for (int i = 0; i < batchResultFutures.size(); i++) {
+        StorageBatchResult<Boolean> future = batchResultFutures.get(i);
+        try {
+          Boolean deleted = future.get();
+          if (!deleted) {
+            if (strategy == MissingStrategy.FAIL_IF_MISSING) {
+              throw new FileNotFoundException(
+                  String.format(
+                      "The specified file does not exist: %s", 
pathPartition.get(i).toString()));
+            } else {
+              LOG.warn("Ignoring failed deletion on file {}.", 
pathPartition.get(i).toString());
+            }
+          }
+        } catch (StorageException e) {
+          throw translateStorageException(pathPartition.get(i), e);
+        }
+      }
+    }
+  }
+
+  public enum OverwriteStrategy {
+    FAIL_IF_EXISTS, // Fail if target exists
+    SKIP_IF_EXISTS, // Skip if target exists
+    SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
+    ALWAYS_OVERWRITE // Overwrite regardless of state
+  }
+
+  private void rewriteHelper(
+      Iterable<GcsPath> srcPaths,
+      Iterable<GcsPath> dstPaths,
+      boolean deleteSrc,
+      MissingStrategy srcMissing,
+      OverwriteStrategy dstOverwrite)
+      throws IOException {
+    List<GcsPath> srcList = Lists.newArrayList(srcPaths);
+    List<GcsPath> dstList = Lists.newArrayList(dstPaths);
+    checkArgument(
+        srcList.size() == dstList.size(),
+        "Number of source files %s must equal number of destination files %s",
+        srcList.size(),
+        dstList.size());
+
+    for (int i = 0; i < srcList.size(); i++) {
+      GcsPath srcPath = srcList.get(i);
+      GcsPath dstPath = dstList.get(i);
+      BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
+      BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());
+
+      CopyRequest.Builder copyRequestBuilder =
+          CopyRequest.newBuilder()
+              .setSource(srcId)
+              .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);
+
+      if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
+        copyRequestBuilder.setTarget(dstId);
+      } else {
+        // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking 
the target blob
+        BlobInfo existingTarget;
+        try {
+          existingTarget = storage.get(dstId);
+        } catch (StorageException e) {
+          throw translateStorageException(dstPath, e);
+        }
+
+        if (existingTarget == null) {
+          copyRequestBuilder.setTarget(dstId, 
Storage.BlobTargetOption.doesNotExist());
+        } else {
+          switch (dstOverwrite) {
+            case SKIP_IF_EXISTS:
+              LOG.warn("Ignoring rewriting from {} to {} because target 
exists.", srcPath, dstPath);
+              continue; // Skip to next file in for-loop
+
+            case SAFE_OVERWRITE:
+              copyRequestBuilder.setTarget(
+                  dstId, 
Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
+              break;
+
+            case FAIL_IF_EXISTS:
+              throw new FileAlreadyExistsException(
+                  srcPath.toString(),
+                  dstPath.toString(),
+                  "Target object already exists and strategy is 
FAIL_IF_EXISTS");
+            default:
+              throw new IllegalStateException("Unknown OverwriteStrategy: " + 
dstOverwrite);
+          }
+        }
+      }
+
+      try {
+        CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
+        copyWriter.getResult();
+
+        if (deleteSrc) {
+          if (!storage.delete(srcId)) {
+            // This may happen if the source file is deleted by another 
process after copy.
+            LOG.warn(
+                "Source file {} could not be deleted after move to {}. It may 
not have existed.",
+                srcPath,
+                dstPath);
+          }
+        }
+      } catch (StorageException e) {
+        if (e.getCode() == 404 && srcMissing == 
MissingStrategy.SKIP_IF_MISSING) {
+          LOG.warn(
+              "Ignoring rewriting from {} to {} because source does not 
exist.", srcPath, dstPath);
+          continue;
+        }
+        throw translateStorageException(srcPath, e);
+      }
+    }
+  }
+
+  public void copy(
+      Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, 
OverwriteStrategy strategy)
+      throws IOException {
+    rewriteHelper(srcPaths, dstPaths, false, MissingStrategy.FAIL_IF_MISSING, 
strategy);
+  }
+
+  public void move(
+      Iterable<GcsPath> srcPaths,
+      Iterable<GcsPath> dstPaths,
+      MissingStrategy srcMissing,
+      OverwriteStrategy dstOverwrite)
+      throws IOException {
+    rewriteHelper(srcPaths, dstPaths, true, srcMissing, dstOverwrite);
+  }
+
   /** Get the {@link Bucket} from Cloud Storage path or propagates an 
exception. */
   public Bucket getBucket(GcsPath path, BucketGetOption... options) throws 
IOException {
     String bucketName = path.getBucket();
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
index db5097c9515..80ffd72924f 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
@@ -37,7 +37,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -297,4 +300,291 @@ public class GcsUtilParameterizedIT {
       }
     }
   }
+
+  private List<GcsPath> createTestBucketHelper(String bucketName) throws 
IOException {
+    final List<GcsPath> originPaths =
+        Arrays.asList(
+            
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
+            
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt"));
+
+    final List<GcsPath> testPaths =
+        originPaths.stream()
+            .map(o -> GcsPath.fromComponents(bucketName, o.getObject()))
+            .collect(Collectors.toList());
+
+    // create bucket and copy some initial files into there
+    if (experiment.equals("use_gcsutil_v2")) {
+      gcsUtil.createBucket(BucketInfo.of(bucketName));
+
+      gcsUtil.copyV2(originPaths, testPaths);
+    } else {
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      gcsUtil.createBucket(gcsOptions.getProject(), new 
Bucket().setName(bucketName));
+
+      final List<String> originList =
+          originPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+      final List<String> testList =
+          testPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+      gcsUtil.copy(originList, testList);
+    }
+
+    return testPaths;
+  }
+
+  private void tearDownTestBucketHelper(String bucketName) {
+    try {
+      // use "**" in the pattern to match any characters including "/".
+      final List<GcsPath> paths =
+          gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**", 
bucketName)));
+      if (experiment.equals("use_gcsutil_v2")) {
+        gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING);
+        gcsUtil.removeBucket(BucketInfo.of(bucketName));
+      } else {
+        
gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList()));
+        gcsUtil.removeBucket(new Bucket().setName(bucketName));
+      }
+    } catch (IOException e) {
+      System.err.println(
+          "Error during tear down of test bucket " + bucketName + ": " + 
e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCopy() throws IOException {
+    final String existingBucket = "apache-beam-temp-bucket-12345";
+    final String nonExistentBucket = "my-random-test-bucket-12345";
+
+    try {
+      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+      final List<GcsPath> dstPaths =
+          srcPaths.stream()
+              .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + 
".bak"))
+              .collect(Collectors.toList());
+      final List<GcsPath> errPaths =
+          srcPaths.stream()
+              .map(o -> GcsPath.fromComponents(nonExistentBucket, 
o.getObject()))
+              .collect(Collectors.toList());
+
+      assertNotExists(dstPaths.get(0));
+      assertNotExists(dstPaths.get(1));
+
+      if (experiment.equals("use_gcsutil_v2")) {
+        // (1) when the target files do not exist
+        gcsUtil.copyV2(srcPaths, dstPaths);
+        assertExists(dstPaths.get(0));
+        assertExists(dstPaths.get(1));
+
+        // (2) when the target files exist
+        // (2a) no exception on SAFE_OVERWRITE, ALWAYS_OVERWRITE, 
SKIP_IF_EXISTS
+        gcsUtil.copyV2(srcPaths, dstPaths);
+        gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.ALWAYS_OVERWRITE);
+        gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SKIP_IF_EXISTS);
+
+        // (2b) raise exception on FAIL_IF_EXISTS
+        assertThrows(
+            FileAlreadyExistsException.class,
+            () -> gcsUtil.copy(srcPaths, dstPaths, 
OverwriteStrategy.FAIL_IF_EXISTS));
+
+        // (3) raise exception when the target bucket is nonexistent.
+        assertThrows(FileNotFoundException.class, () -> 
gcsUtil.copyV2(srcPaths, errPaths));
+
+        // (4) raise exception when the source files are nonexistent.
+        assertThrows(FileNotFoundException.class, () -> 
gcsUtil.copyV2(errPaths, dstPaths));
+      } else {
+        final List<String> srcList =
+            srcPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        final List<String> dstList =
+            dstPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        final List<String> errList =
+            errPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+
+        // (1) when the target files do not exist
+        gcsUtil.copy(srcList, dstList);
+        assertExists(dstPaths.get(0));
+        assertExists(dstPaths.get(1));
+
+        // (2) when the target files exist, no exception
+        gcsUtil.copy(srcList, dstList);
+
+        // (3) raise exception when the target bucket is nonexistent.
+        assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(srcList, 
errList));
+
+        // (4) raise exception when the source files are nonexistent.
+        assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(errList, 
dstList));
+      }
+    } finally {
+      tearDownTestBucketHelper(existingBucket);
+    }
+  }
+
+  @Test
+  public void testRemove() throws IOException {
+    final String existingBucket = "apache-beam-temp-bucket-12345";
+    final String nonExistentBucket = "my-random-test-bucket-12345";
+
+    try {
+      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+      final List<GcsPath> errPaths =
+          srcPaths.stream()
+              .map(o -> GcsPath.fromComponents(nonExistentBucket, 
o.getObject()))
+              .collect(Collectors.toList());
+
+      assertExists(srcPaths.get(0));
+      assertExists(srcPaths.get(1));
+
+      if (experiment.equals("use_gcsutil_v2")) {
+        // (1) when the files to remove exist
+        gcsUtil.removeV2(srcPaths);
+        assertNotExists(srcPaths.get(0));
+        assertNotExists(srcPaths.get(1));
+
+        // (2) when the files to remove have been deleted
+        // (2a) no exception on SKIP_IF_MISSING
+        gcsUtil.removeV2(srcPaths);
+        gcsUtil.remove(srcPaths, MissingStrategy.SKIP_IF_MISSING);
+
+        // (2b) raise exception on FAIL_IF_MISSING
+        assertThrows(
+            FileNotFoundException.class,
+            () -> gcsUtil.remove(srcPaths, MissingStrategy.FAIL_IF_MISSING));
+
+        // (3) when the files are from an nonexistent bucket
+        // (3a) no exception on SKIP_IF_MISSING
+        gcsUtil.removeV2(errPaths);
+        gcsUtil.remove(errPaths, MissingStrategy.SKIP_IF_MISSING);
+
+        // (3b) raise exception on FAIL_IF_MISSING
+        assertThrows(
+            FileNotFoundException.class,
+            () -> gcsUtil.remove(errPaths, MissingStrategy.FAIL_IF_MISSING));
+      } else {
+        final List<String> srcList =
+            srcPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        final List<String> errList =
+            errPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+
+        // (1) when the files to remove exist
+        gcsUtil.remove(srcList);
+        assertNotExists(srcPaths.get(0));
+        assertNotExists(srcPaths.get(1));
+
+        // (2) when the files to remove have been deleted, no exception
+        gcsUtil.remove(srcList);
+
+        // (3) when the files are from an nonexistent bucket, no exception
+        gcsUtil.remove(errList);
+      }
+    } finally {
+      tearDownTestBucketHelper(existingBucket);
+    }
+  }
+
+  @Test
+  public void testRename() throws IOException {
+    final String existingBucket = "apache-beam-temp-bucket-12345";
+    final String nonExistentBucket = "my-random-test-bucket-12345";
+
+    try {
+      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+      final List<GcsPath> tmpPaths =
+          srcPaths.stream()
+              .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" + 
o.getObject()))
+              .collect(Collectors.toList());
+      final List<GcsPath> dstPaths =
+          srcPaths.stream()
+              .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + 
".bak"))
+              .collect(Collectors.toList());
+      final List<GcsPath> errPaths =
+          srcPaths.stream()
+              .map(o -> GcsPath.fromComponents(nonExistentBucket, 
o.getObject()))
+              .collect(Collectors.toList());
+
+      assertNotExists(dstPaths.get(0));
+      assertNotExists(dstPaths.get(1));
+      if (experiment.equals("use_gcsutil_v2")) {
+        // Make a copy of sources
+        gcsUtil.copyV2(srcPaths, tmpPaths);
+
+        // (1) when the source files exist and target files do not
+        gcsUtil.renameV2(tmpPaths, dstPaths);
+        assertNotExists(tmpPaths.get(0));
+        assertNotExists(tmpPaths.get(1));
+        assertExists(dstPaths.get(0));
+        assertExists(dstPaths.get(1));
+
+        // (2) when the source files do not exist
+        // (2a) no exception if IGNORE_MISSING_FILES is set
+        gcsUtil.renameV2(errPaths, dstPaths, 
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+        // (2b) raise exception if if IGNORE_MISSING_FILES is not set
+        assertThrows(FileNotFoundException.class, () -> 
gcsUtil.renameV2(errPaths, dstPaths));
+
+        // (3) when both source files and target files exist
+        gcsUtil.renameV2(
+            srcPaths, dstPaths, 
MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+        gcsUtil.renameV2(srcPaths, dstPaths);
+      } else {
+        final List<String> srcList =
+            srcPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        final List<String> tmpList =
+            tmpPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        final List<String> dstList =
+            dstPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        final List<String> errList =
+            errPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+
+        // Make a copy of sources
+        gcsUtil.copy(srcList, tmpList);
+
+        // (1) when the source files exist and target files do not
+        gcsUtil.rename(tmpList, dstList);
+        assertNotExists(tmpPaths.get(0));
+        assertNotExists(tmpPaths.get(1));
+        assertExists(dstPaths.get(0));
+        assertExists(dstPaths.get(1));
+
+        // (2) when the source files do not exist
+        // (2a) no exception if IGNORE_MISSING_FILES is set
+        gcsUtil.rename(errList, dstList, 
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+        // (2b) raise exception if if IGNORE_MISSING_FILES is not set
+        assertThrows(FileNotFoundException.class, () -> 
gcsUtil.rename(errList, dstList));
+
+        // (3) when both source files and target files exist
+        assertExists(srcPaths.get(0));
+        assertExists(srcPaths.get(1));
+        assertExists(dstPaths.get(0));
+        assertExists(dstPaths.get(1));
+
+        // There is a bug in V1 where SKIP_IF_DESTINATION_EXISTS is not 
honored.
+        gcsUtil.rename(
+            srcList, dstList, 
MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+
+        assertNotExists(srcPaths.get(0)); // BUG! The renaming is supposed to 
be skipped
+        assertNotExists(srcPaths.get(1)); // BUG! The renaming is supposed to 
be skipped
+        // assertExists(srcPaths.get(0));
+        // assertExists(srcPaths.get(1));
+        assertExists(dstPaths.get(0));
+        assertExists(dstPaths.get(1));
+      }
+    } finally {
+      tearDownTestBucketHelper(existingBucket);
+    }
+  }
+
+  private void assertExists(GcsPath path) throws IOException {
+    if (experiment.equals("use_gcsutil_v2")) {
+      gcsUtil.getBlob(path);
+    } else {
+      gcsUtil.getObject(path);
+    }
+  }
+
+  private void assertNotExists(GcsPath path) throws IOException {
+    if (experiment.equals("use_gcsutil_v2")) {
+      assertThrows(FileNotFoundException.class, () -> gcsUtil.getBlob(path));
+    } else {
+      assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
index f344c6c2dba..9512fec312c 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
@@ -351,6 +351,7 @@ public class GcsPathTest {
 
   @Test
   public void testIsWildcard() {
+    assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/*")));
     assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo*")));
     assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo?")));
     assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo[a-z]")));
@@ -359,6 +360,7 @@ public class GcsPathTest {
 
   @Test
   public void testGetNonWildcardPrefix() {
+    assertEquals("gs://bucket/", 
GcsPath.getNonWildcardPrefix("gs://bucket/*"));
     assertEquals("gs://bucket/foo", 
GcsPath.getNonWildcardPrefix("gs://bucket/foo*"));
     assertEquals("gs://bucket/foo", 
GcsPath.getNonWildcardPrefix("gs://bucket/foo?"));
     assertEquals("gs://bucket/foo", 
GcsPath.getNonWildcardPrefix("gs://bucket/foo[a-z]"));

Reply via email to