This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 72ce2074caf GCS client library migration in Java SDK - part 2 (#37502)
72ce2074caf is described below

commit 72ce2074cafef9b0af08ce5d28d12fbe0e8b99fe
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Feb 12 22:45:18 2026 -0500

    GCS client library migration in Java SDK - part 2 (#37502)
    
    * Migrate fileSize() and getObject()
    
    * Move getNonWildcardPrefix() and isWildcard() from GcsUtilV1 to GcsPath
    
    * Migrate listObjects()
    
    * Migrate getBucket(), bucketAccessible(), verifyBucketAccessible().
    
    * Add tests and extract setUp function.
    
    * Migrate bucketOwner()
    
    * Migrate createBucket() and removeBucket()
    
    * Refactor exception handling.
    
    * Change returned value of listBlobs. Migrate expand().
    
    * Add deprecated annotation to some V1 apis. Refactor expand().
    
    * Add exception handling for listBlobs()
    
    * Add some more tests and comments.
    
    * Fetch only the required field to minimize data transfer.
    
    * Migrate getObjects() and refactor some method arguments.
    
    * Refactor translateStorageException
    
    * Control size of the batch for getBlobs.
    
    * Formatting
    
    * Fix GcpCoreApiSurfaceTest
    
    * Add java deps.
    
    * Change getBucketV2 to getBucketWithOptions. Fix GcpCoreApiSurfaceTest.
    
    * Put the new IT tests into a different file.
    
    * Revise according to gemini review.
    
    * Change argument from List to Iterable in getBlobs()
    
    * Remove redundant comments.
    
    * Minor change.
    
    * Revise according to review.
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 .../google-cloud-platform-core/build.gradle        |   3 +
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      | 111 ++++++-
 .../beam/sdk/extensions/gcp/util/GcsUtilV1.java    |  20 +-
 .../beam/sdk/extensions/gcp/util/GcsUtilV2.java    | 325 +++++++++++++++++++++
 .../sdk/extensions/gcp/util/gcsfs/GcsPath.java     |  15 +
 .../sdk/extensions/gcp/GcpCoreApiSurfaceTest.java  |  15 +-
 .../gcp/util/GcsUtilParameterizedIT.java           | 300 +++++++++++++++++++
 .../sdk/extensions/gcp/util/gcsfs/GcsPathTest.java |  20 ++
 9 files changed, 788 insertions(+), 22 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 774674b545e..df2db7df71b 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -766,6 +766,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_cloud_spanner_bom                    : 
"com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version",
         google_cloud_spanner                        : 
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom 
sets version
         google_cloud_spanner_test                   : 
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
+        google_cloud_storage                        : 
"com.google.cloud:google-cloud-storage", // google_cloud_platform_libraries_bom 
sets version
         google_cloud_tink                           : 
"com.google.crypto.tink:tink:1.19.0",
         google_cloud_vertexai                       : 
"com.google.cloud:google-cloud-vertexai", // 
google_cloud_platform_libraries_bom sets version
         google_code_gson                            : 
"com.google.code.gson:gson:$google_code_gson_version",
diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle 
b/sdks/java/extensions/google-cloud-platform-core/build.gradle
index 8d21df50006..f1bfb63c7a3 100644
--- a/sdks/java/extensions/google-cloud-platform-core/build.gradle
+++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle
@@ -39,9 +39,12 @@ dependencies {
   implementation library.java.vendored_guava_32_1_2_jre
   implementation project(path: ":sdks:java:core", configuration: "shadow")
   implementation project(path: ":runners:core-java")
+  implementation library.java.gax
   implementation library.java.google_http_client_gson
   implementation library.java.google_auth_library_oauth2_http
   implementation library.java.google_api_client
+  implementation library.java.google_cloud_core
+  implementation library.java.google_cloud_storage
   implementation library.java.bigdataoss_gcsio
   implementation library.java.bigdataoss_util
   implementation library.java.google_api_services_cloudresourcemanager
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 220c08c6a7f..a2fdf24e9fb 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -20,11 +20,17 @@ package org.apache.beam.sdk.extensions.gcp.util;
 import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
+import com.google.api.gax.paging.Page;
 import com.google.api.services.storage.Storage;
 import com.google.api.services.storage.model.Bucket;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.auth.Credentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage.BlobGetOption;
+import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BucketGetOption;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.nio.channels.SeekableByteChannel;
@@ -34,6 +40,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -44,6 +51,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 
 public class GcsUtil {
   @VisibleForTesting GcsUtilV1 delegate;
+  @VisibleForTesting @Nullable GcsUtilV2 delegateV2;
 
   public static class GcsCountersOptions {
     final GcsUtilV1.GcsCountersOptions delegate;
@@ -96,11 +104,11 @@ public class GcsUtil {
   }
 
   public static String getNonWildcardPrefix(String globExp) {
-    return GcsUtilV1.getNonWildcardPrefix(globExp);
+    return GcsPath.getNonWildcardPrefix(globExp);
   }
 
   public static boolean isWildcard(GcsPath spec) {
-    return GcsUtilV1.isWildcard(spec);
+    return GcsPath.isWildcard(spec);
   }
 
   @VisibleForTesting
@@ -125,6 +133,12 @@ public class GcsUtil {
             rewriteDataOpBatchLimit,
             gcsCountersOptions.delegate,
             gcsOptions);
+
+    if (ExperimentalOptions.hasExperiment(gcsOptions, "use_gcsutil_v2")) {
+      this.delegateV2 = new GcsUtilV2(gcsOptions);
+    } else {
+      this.delegateV2 = null;
+    }
   }
 
   protected void setStorageClient(Storage storageClient) {
@@ -136,6 +150,9 @@ public class GcsUtil {
   }
 
   public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.expand(gcsPattern);
+    }
     return delegate.expand(gcsPattern);
   }
 
@@ -146,18 +163,34 @@ public class GcsUtil {
   }
 
   public long fileSize(GcsPath path) throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.fileSize(path);
+    }
     return delegate.fileSize(path);
   }
 
+  /** @deprecated use {@link #getBlob(GcsPath)}. */
+  @Deprecated
   public StorageObject getObject(GcsPath gcsPath) throws IOException {
     return delegate.getObject(gcsPath);
   }
 
+  /** @deprecated use {@link #getBlob(GcsPath, BlobGetOption...)}. */
+  @Deprecated
   @VisibleForTesting
   StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) 
throws IOException {
     return delegate.getObject(gcsPath, backoff, sleeper);
   }
 
+  public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws 
IOException {
+    if (delegateV2 != null) {
+      return delegateV2.getBlob(gcsPath, options);
+    }
+    throw new IOException("GcsUtil V2 not initialized.");
+  }
+
+  /** @deprecated use {@link #getBlobs(Iterable, BlobGetOption...)}. */
+  @Deprecated
   public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) 
throws IOException {
     List<GcsUtilV1.StorageObjectOrIOException> legacy = 
delegate.getObjects(gcsPaths);
     return legacy.stream()
@@ -165,17 +198,51 @@ public class GcsUtil {
         .collect(java.util.stream.Collectors.toList());
   }
 
+  public List<BlobResult> getBlobs(Iterable<GcsPath> gcsPaths, 
BlobGetOption... options)
+      throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.getBlobs(gcsPaths, options);
+    }
+    throw new IOException("GcsUtil V2 not initialized.");
+  }
+
+  /** @deprecated use {@link #listBlobs(String, String, String, 
BlobListOption...)}. */
+  @Deprecated
   public Objects listObjects(String bucket, String prefix, @Nullable String 
pageToken)
       throws IOException {
     return delegate.listObjects(bucket, prefix, pageToken);
   }
 
+  /** @deprecated use {@link #listBlobs(String, String, String, String, 
BlobListOption...)}. */
+  @Deprecated
   public Objects listObjects(
       String bucket, String prefix, @Nullable String pageToken, @Nullable 
String delimiter)
       throws IOException {
     return delegate.listObjects(bucket, prefix, pageToken, delimiter);
   }
 
+  public Page<Blob> listBlobs(
+      String bucket, String prefix, @Nullable String pageToken, 
BlobListOption... options)
+      throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.listBlobs(bucket, prefix, pageToken, options);
+    }
+    throw new IOException("GcsUtil V2 not initialized.");
+  }
+
+  public Page<Blob> listBlobs(
+      String bucket,
+      String prefix,
+      @Nullable String pageToken,
+      @Nullable String delimiter,
+      BlobListOption... options)
+      throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.listBlobs(bucket, prefix, pageToken, delimiter, 
options);
+    }
+    throw new IOException("GcsUtil V2 not initialized.");
+  }
+
   @VisibleForTesting
   List<Long> fileSizes(List<GcsPath> paths) throws IOException {
     return delegate.fileSizes(paths);
@@ -254,29 +321,69 @@ public class GcsUtil {
   }
 
   public void verifyBucketAccessible(GcsPath path) throws IOException {
+    if (delegateV2 != null) {
+      delegateV2.verifyBucketAccessible(path);
+      return;
+    }
     delegate.verifyBucketAccessible(path);
   }
 
   public boolean bucketAccessible(GcsPath path) throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.bucketAccessible(path);
+    }
     return delegate.bucketAccessible(path);
   }
 
   public long bucketOwner(GcsPath path) throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.bucketProject(path);
+    }
     return delegate.bucketOwner(path);
   }
 
+  /** @deprecated use {@link #createBucket(BucketInfo)}. */
+  @Deprecated
   public void createBucket(String projectId, Bucket bucket) throws IOException 
{
     delegate.createBucket(projectId, bucket);
   }
 
+  public void createBucket(BucketInfo bucketInfo) throws IOException {
+    if (delegateV2 != null) {
+      delegateV2.createBucket(bucketInfo);
+    } else {
+      throw new IOException("GcsUtil V2 not initialized.");
+    }
+  }
+
+  /** @deprecated use {@link #getBucketWithOptions(GcsPath, 
BucketGetOption...)} . */
+  @Deprecated
   public @Nullable Bucket getBucket(GcsPath path) throws IOException {
     return delegate.getBucket(path);
   }
 
+  public com.google.cloud.storage.@Nullable Bucket getBucketWithOptions(
+      GcsPath path, BucketGetOption... options) throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.getBucket(path, options);
+    }
+    throw new IOException("GcsUtil V2 not initialized.");
+  }
+
+  /** @deprecated use {@link #removeBucket(BucketInfo)}. */
+  @Deprecated
   public void removeBucket(Bucket bucket) throws IOException {
     delegate.removeBucket(bucket);
   }
 
+  public void removeBucket(BucketInfo bucketInfo) throws IOException {
+    if (delegateV2 != null) {
+      delegateV2.removeBucket(bucketInfo);
+    } else {
+      throw new IOException("GcsUtil V2 not initialized.");
+    }
+  }
+
   @VisibleForTesting
   boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) 
throws IOException {
     return delegate.bucketAccessible(path, backoff, sleeper);
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
index c44eb36c263..1ade4be6fdb 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
@@ -76,7 +76,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
@@ -165,9 +164,6 @@ class GcsUtilV1 {
   /** Maximum number of items to retrieve per Objects.List request. */
   private static final long MAX_LIST_ITEMS_PER_CALL = 1024;
 
-  /** Matches a glob containing a wildcard, capturing the portion before the 
first wildcard. */
-  private static final Pattern GLOB_PREFIX = 
Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
-
   /** Maximum number of requests permitted in a GCS batch request. */
   private static final int MAX_REQUESTS_PER_BATCH = 100;
   /** Default maximum number of requests permitted in a GCS batch request 
where data is copied. */
@@ -224,18 +220,6 @@ class GcsUtilV1 {
 
   @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed;
 
-  /** Returns the prefix portion of the glob that doesn't contain wildcards. */
-  public static String getNonWildcardPrefix(String globExp) {
-    Matcher m = GLOB_PREFIX.matcher(globExp);
-    checkArgument(m.matches(), String.format("Glob expression: [%s] is not 
expandable.", globExp));
-    return m.group("PREFIX");
-  }
-
-  /** Returns true if the given {@code spec} contains wildcard. */
-  public static boolean isWildcard(GcsPath spec) {
-    return GLOB_PREFIX.matcher(spec.getObject()).matches();
-  }
-
   @VisibleForTesting
   GcsUtilV1(
       Storage storageClient,
@@ -333,9 +317,9 @@ class GcsUtilV1 {
   public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
     Pattern p = null;
     String prefix = null;
-    if (isWildcard(gcsPattern)) {
+    if (GcsPath.isWildcard(gcsPattern)) {
       // Part before the first wildcard character.
-      prefix = getNonWildcardPrefix(gcsPattern.getObject());
+      prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject());
       p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
     } else {
       // Not a wildcard.
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
new file mode 100644
index 00000000000..a2df45511c9
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.gax.paging.Page;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Bucket;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobField;
+import com.google.cloud.storage.Storage.BlobGetOption;
+import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BucketField;
+import com.google.cloud.storage.Storage.BucketGetOption;
+import com.google.cloud.storage.StorageBatch;
+import com.google.cloud.storage.StorageBatchResult;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+class GcsUtilV2 {
+  private static final org.slf4j.Logger LOG = 
org.slf4j.LoggerFactory.getLogger(GcsUtilV2.class);
+
+  public static class GcsUtilFactory implements DefaultValueFactory<GcsUtilV2> 
{
+    @Override
+    public GcsUtilV2 create(PipelineOptions options) {
+      // GcsOptions gcsOptions = options.as(GcsOptions.class);
+      // Storage.Builder storageBuilder = 
Transport.newStorageClient(gcsOptions);
+      return new GcsUtilV2(options);
+    }
+  }
+
+  private Storage storage;
+
+  /** Maximum number of items to retrieve per Objects.List request. */
+  private static final long MAX_LIST_BLOBS_PER_CALL = 1024;
+
+  /** Maximum number of requests permitted in a GCS batch request. */
+  private static final int MAX_REQUESTS_PER_BATCH = 100;
+
+  GcsUtilV2(PipelineOptions options) {
+    String projectId = options.as(GcpOptions.class).getProject();
+    storage = 
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
+  }
+
+  @SuppressWarnings({
+    "nullness" // For Creating AccessDeniedException and 
FileAlreadyExistsException with null.
+  })
+  private IOException translateStorageException(GcsPath gcsPath, 
StorageException e) {
+    switch (e.getCode()) {
+      case 403:
+        return new AccessDeniedException(gcsPath.toString(), null, 
e.getMessage());
+      case 409:
+        return new FileAlreadyExistsException(gcsPath.toString(), null, 
e.getMessage());
+      default:
+        return new IOException(e);
+    }
+  }
+
+  private IOException translateStorageException(
+      String bucketName, @Nullable String blobName, StorageException e) {
+    return translateStorageException(GcsPath.fromComponents(bucketName, 
blobName), e);
+  }
+
+  public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws 
IOException {
+    try {
+      Blob blob = storage.get(gcsPath.getBucket(), gcsPath.getObject(), 
options);
+      if (blob == null) {
+        throw new FileNotFoundException(
+            String.format("The specified file does not exist: %s", 
gcsPath.toString()));
+      }
+      return blob;
+    } catch (StorageException e) {
+      throw translateStorageException(gcsPath, e);
+    }
+  }
+
+  public long fileSize(GcsPath gcsPath) throws IOException {
+    return getBlob(gcsPath, BlobGetOption.fields(BlobField.SIZE)).getSize();
+  }
+
+  /** A class that holds either a {@link Blob} or an {@link IOException}. */
+  @AutoValue
+  public abstract static class BlobResult {
+
+    /** Returns the {@link Blob}. */
+    public abstract @Nullable Blob blob();
+
+    /** Returns the {@link IOException}. */
+    public abstract @Nullable IOException ioException();
+
+    @VisibleForTesting
+    public static BlobResult create(Blob blob) {
+      return new AutoValue_GcsUtilV2_BlobResult(checkNotNull(blob, "blob"), 
null /* ioException */);
+    }
+
+    @VisibleForTesting
+    public static BlobResult create(IOException ioException) {
+      return new AutoValue_GcsUtilV2_BlobResult(
+          null /* blob */, checkNotNull(ioException, "ioException"));
+    }
+  }
+
+  public List<BlobResult> getBlobs(Iterable<GcsPath> gcsPaths, 
BlobGetOption... options)
+      throws IOException {
+
+    List<BlobResult> results = new ArrayList<>();
+
+    for (List<GcsPath> pathPartition :
+        Lists.partition(Lists.newArrayList(gcsPaths), MAX_REQUESTS_PER_BATCH)) 
{
+
+      // Create a new empty batch every time
+      StorageBatch batch = storage.batch();
+      List<StorageBatchResult<Blob>> batchResultFutures = new ArrayList<>();
+
+      for (GcsPath path : pathPartition) {
+        batchResultFutures.add(batch.get(path.getBucket(), path.getObject(), 
options));
+      }
+      batch.submit();
+
+      for (int i = 0; i < batchResultFutures.size(); i++) {
+        StorageBatchResult<Blob> future = batchResultFutures.get(i);
+        try {
+          Blob blob = future.get();
+          if (blob != null) {
+            results.add(BlobResult.create(blob));
+          } else {
+            results.add(
+                BlobResult.create(
+                    new FileNotFoundException(
+                        String.format(
+                            "The specified file does not exist: %s",
+                            pathPartition.get(i).toString()))));
+          }
+        } catch (StorageException e) {
+          
results.add(BlobResult.create(translateStorageException(pathPartition.get(i), 
e)));
+        }
+      }
+    }
+    return results;
+  }
+
+  /** Lists {@link Blob}s given the {@code bucket}, {@code prefix}, {@code 
pageToken}. */
+  public Page<Blob> listBlobs(
+      String bucket,
+      String prefix,
+      @Nullable String pageToken,
+      @Nullable String delimiter,
+      BlobListOption... options)
+      throws IOException {
+    List<BlobListOption> blobListOptions = new ArrayList<>();
+    blobListOptions.add(BlobListOption.pageSize(MAX_LIST_BLOBS_PER_CALL));
+    if (pageToken != null) {
+      blobListOptions.add(BlobListOption.pageToken(pageToken));
+    }
+    if (prefix != null) {
+      blobListOptions.add(BlobListOption.prefix(prefix));
+    }
+    if (delimiter != null) {
+      blobListOptions.add(BlobListOption.delimiter(delimiter));
+    }
+    if (options != null && options.length > 0) {
+      for (BlobListOption option : options) {
+        blobListOptions.add(option);
+      }
+    }
+
+    try {
+      return storage.list(bucket, blobListOptions.toArray(new 
BlobListOption[0]));
+    } catch (StorageException e) {
+      throw translateStorageException(bucket, prefix, e);
+    }
+  }
+
+  public Page<Blob> listBlobs(
+      String bucket, String prefix, @Nullable String pageToken, 
BlobListOption... options)
+      throws IOException {
+    return listBlobs(bucket, prefix, pageToken, null, options);
+  }
+
+  /**
+   * Expands a pattern into matched paths. The pattern path may contain globs, 
which are expanded in
+   * the result. For patterns that only match a single object, we ensure that 
the object exists.
+   */
+  public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
+    // Handle Non-Wildcard Path
+    if (!GcsPath.isWildcard(gcsPattern)) {
+      try {
+        // Use a get request to fetch the metadata of the object, and ignore 
the return value.
+        // The request has strong global consistency.
+        getBlob(gcsPattern, BlobGetOption.fields(BlobField.NAME));
+        return ImmutableList.of(gcsPattern);
+      } catch (FileNotFoundException e) {
+        // If the path was not found, return an empty list.
+        return ImmutableList.of();
+      }
+    }
+
+    // Handle Wildcard Path
+    // TODO: check out BlobListOption.matchGlob() for a similar function.
+    String prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject());
+    Pattern p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
+
+    LOG.debug(
+        "matching files in bucket {}, prefix {} against pattern {}",
+        gcsPattern.getBucket(),
+        prefix,
+        p.toString());
+
+    List<GcsPath> results = new ArrayList<>();
+    Page<Blob> blobs =
+        listBlobs(
+            gcsPattern.getBucket(),
+            prefix,
+            null,
+            BlobListOption.fields(BlobField.NAME, BlobField.BUCKET));
+    // Iterate through all elements page by page (lazily)
+    for (Blob b : blobs.iterateAll()) {
+      String name = b.getName();
+      // Filter objects based on the regex. Skip directories, which end with a 
slash.
+      if (p.matcher(name).matches() && !name.endsWith("/")) {
+        LOG.debug("Matched object: {}", name);
+        results.add(GcsPath.fromComponents(b.getBucket(), b.getName()));
+      }
+    }
+    return results;
+  }
+
+  /** Get the {@link Bucket} from Cloud Storage path or propagates an 
exception. */
+  public Bucket getBucket(GcsPath path, BucketGetOption... options) throws 
IOException {
+    String bucketName = path.getBucket();
+    try {
+      Bucket bucket = storage.get(bucketName, options);
+      if (bucket == null) {
+        throw new FileNotFoundException(
+            String.format("The specified bucket does not exist: gs://%s", 
bucketName));
+      }
+      return bucket;
+    } catch (StorageException e) {
+      throw translateStorageException(bucketName, null, e);
+    }
+  }
+
+  /** Returns whether the GCS bucket exists and is accessible. */
+  public boolean bucketAccessible(GcsPath path) {
+    try {
+      // Fetch only the name field to minimize data transfer
+      getBucket(path, BucketGetOption.fields(BucketField.NAME));
+      return true;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Checks whether the GCS bucket exists. Similar to {@link 
#bucketAccessible(GcsPath)}, but throws
+   * exception if the bucket is inaccessible due to permissions or does not 
exist.
+   */
+  public void verifyBucketAccessible(GcsPath path) throws IOException {
+    // Fetch only the name field to minimize data transfer
+    getBucket(path, BucketGetOption.fields(BucketField.NAME));
+  }
+
+  /**
+   * Returns the project number of the project which owns this bucket. If the 
bucket exists, it must
+   * be accessible otherwise the permissions exception will be propagated. If 
the bucket does not
+   * exist, an exception will be thrown.
+   */
+  public long bucketProject(GcsPath path) throws IOException {
+    Bucket bucket = getBucket(path, 
BucketGetOption.fields(BucketField.PROJECT));
+    return bucket.getProject().longValue();
+  }
+
+  public void createBucket(BucketInfo bucketInfo) throws IOException {
+    try {
+      storage.create(bucketInfo);
+    } catch (StorageException e) {
+      throw translateStorageException(bucketInfo.getName(), null, e);
+    }
+  }
+
+  public void removeBucket(BucketInfo bucketInfo) throws IOException {
+    try {
+      if (!storage.delete(bucketInfo.getName())) {
+        throw new FileNotFoundException(
+            String.format("The specified bucket does not exist: gs://%s", 
bucketInfo.getName()));
+      }
+    } catch (StorageException e) {
+      throw translateStorageException(bucketInfo.getName(), null, e);
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
index 29c312d7a5e..745ff4d3630 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
@@ -129,6 +129,9 @@ public class GcsPath implements Path, Serializable {
   /** Pattern that is used to validate a GCS bucket name. */
   private static final Pattern GCS_BUCKET_NAME = 
Pattern.compile("[a-z0-9][-_a-z0-9.]+[a-z0-9]");
 
+  /** Matches a glob containing a wildcard, capturing the portion before the 
first wildcard. */
+  private static final Pattern GLOB_PREFIX = 
Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
+
   /** Creates a GcsPath from a OnePlatform resource name in string form. */
   public static GcsPath fromResourceName(String name) {
     Matcher m = GCS_RESOURCE_NAME.matcher(name);
@@ -605,4 +608,16 @@ public class GcsPath implements Path, Serializable {
       return bucket + "/" + object;
     }
   }
+
+  /** Returns the prefix portion of the glob before the first wildcard 
character. */
+  public static String getNonWildcardPrefix(String globExp) {
+    Matcher m = GLOB_PREFIX.matcher(globExp);
+    checkArgument(m.matches(), String.format("Glob expression: [%s] is not 
expandable.", globExp));
+    return m.group("PREFIX");
+  }
+
+  /** Returns true if the given {@code spec} contains wildcard. */
+  public static boolean isWildcard(GcsPath spec) {
+    return GLOB_PREFIX.matcher(spec.getObject()).matches();
+  }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index 89c60632084..8af5e2260fc 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -50,20 +50,31 @@ public class GcpCoreApiSurfaceTest {
     @SuppressWarnings("unchecked")
     final Set<Matcher<Class<?>>> allowedClasses =
         ImmutableSet.of(
+            classesInPackage("com.fasterxml.jackson.annotation"),
             classesInPackage("com.google.api.client.googleapis"),
             classesInPackage("com.google.api.client.http"),
             classesInPackage("com.google.api.client.json"),
             classesInPackage("com.google.api.client.util"),
+            classesInPackage("com.google.api.core"),
+            classesInPackage("com.google.api.gax"),
             classesInPackage("com.google.api.services.storage"),
             classesInPackage("com.google.auth"),
-            classesInPackage("com.fasterxml.jackson.annotation"),
+            classesInPackage("com.google.cloud"),
+            classesInPackage("com.google.errorprone"),
+            classesInPackage("com.google.iam"),
+            classesInPackage("com.google.protobuf"),
+            classesInPackage("com.google.storage"),
+            classesInPackage("com.google.type"),
             classesInPackage("com.google.cloud.hadoop.gcsio"),
             classesInPackage("com.google.common.collect"), // Via 
gcs-connector ReadOptions builder
+            classesInPackage("io.grpc"),
+            classesInPackage("io.opentelemetry"),
             classesInPackage("java"),
             classesInPackage("javax"),
             classesInPackage("org.apache.beam.model.pipeline.v1"),
             classesInPackage("org.apache.beam.sdk"),
-            classesInPackage("org.joda.time"));
+            classesInPackage("org.joda.time"),
+            classesInPackage("org.threeten.bp"));
 
     assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
   }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
new file mode 100644
index 00000000000..db5097c9515
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.gax.paging.Page;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.UsesKms;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Integration tests for {@link GcsUtil}. These tests are designed to run 
against production Google
+ * Cloud Storage.
+ *
+ * <p>This is a runnerless integration test, even though the Beam IT framework 
assumes one. Thus,
+ * this test should only be run against single runner (such as DirectRunner).
+ */
+@RunWith(Parameterized.class)
+@Category(UsesKms.class)
+public class GcsUtilParameterizedIT {
+
+  @Parameters(name = "{0}")
+  public static Iterable<String> data() {
+    return Arrays.asList("use_gcsutil_v1", "use_gcsutil_v2");
+  }
+
+  @Parameter public String experiment;
+
+  private TestPipelineOptions options;
+  private GcsUtil gcsUtil;
+
+  @Before
+  public void setUp() {
+    options = 
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+
+    // set the experimental flag.
+    ExperimentalOptions experimentalOptions = 
options.as(ExperimentalOptions.class);
+    experimentalOptions.setExperiments(Collections.singletonList(experiment));
+
+    GcsOptions gcsOptions = options.as(GcsOptions.class);
+    gcsUtil = gcsOptions.getGcsUtil();
+  }
+
+  @Test
+  public void testFileSize() throws IOException {
+    final GcsPath gcsPath = 
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+    final long expectedSize = 157283L;
+
+    assertEquals(expectedSize, gcsUtil.fileSize(gcsPath));
+  }
+
+  @Test
+  public void testGetObjectOrGetBlob() throws IOException {
+    final GcsPath existingPath =
+        GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+    final String expectedCRC = "s0a3Tg==";
+
+    String crc;
+    if (experiment.equals("use_gcsutil_v2")) {
+      Blob blob = gcsUtil.getBlob(existingPath);
+      crc = blob.getCrc32c();
+    } else {
+      StorageObject obj = gcsUtil.getObject(existingPath);
+      crc = obj.getCrc32c();
+    }
+    assertEquals(expectedCRC, crc);
+
+    final GcsPath nonExistentPath =
+        GcsPath.fromUri("gs://my-random-test-bucket-12345/unknown-12345.txt");
+    final GcsPath forbiddenPath = 
GcsPath.fromUri("gs://test-bucket/unknown-12345.txt");
+
+    if (experiment.equals("use_gcsutil_v2")) {
+      assertThrows(FileNotFoundException.class, () -> 
gcsUtil.getBlob(nonExistentPath));
+      // For V2, we are returning AccessDeniedException (a subclass of 
IOException) for forbidden
+      // paths.
+      assertThrows(AccessDeniedException.class, () -> 
gcsUtil.getBlob(forbiddenPath));
+    } else {
+      assertThrows(FileNotFoundException.class, () -> 
gcsUtil.getObject(nonExistentPath));
+      assertThrows(IOException.class, () -> gcsUtil.getObject(forbiddenPath));
+    }
+  }
+
+  @Test
+  public void testGetObjectsOrGetBlobs() throws IOException {
+    final GcsPath existingPath =
+        GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+    final GcsPath nonExistentPath =
+        GcsPath.fromUri("gs://my-random-test-bucket-12345/unknown-12345.txt");
+    final List<GcsPath> paths = Arrays.asList(existingPath, nonExistentPath);
+
+    if (experiment.equals("use_gcsutil_v2")) {
+      List<GcsUtilV2.BlobResult> results = gcsUtil.getBlobs(paths);
+      assertEquals(2, results.size());
+      assertTrue(results.get(0).blob() != null);
+      assertTrue(results.get(0).ioException() == null);
+      assertTrue(results.get(1).blob() == null);
+      assertTrue(results.get(1).ioException() != null);
+    } else {
+      List<GcsUtil.StorageObjectOrIOException> results = 
gcsUtil.getObjects(paths);
+      assertEquals(2, results.size());
+      assertTrue(results.get(0).storageObject() != null);
+      assertTrue(results.get(0).ioException() == null);
+      assertTrue(results.get(1).storageObject() == null);
+      assertTrue(results.get(1).ioException() != null);
+    }
+  }
+
+  @Test
+  public void testListObjectsOrListBlobs() throws IOException {
+    final String bucket = "apache-beam-samples";
+    final String prefix = "shakespeare/kingrichard";
+
+    List<String> names;
+    if (experiment.equals("use_gcsutil_v2")) {
+      Page<Blob> blobs = gcsUtil.listBlobs(bucket, prefix, null);
+      names = blobs.streamAll().map(blob -> 
blob.getName()).collect(Collectors.toList());
+    } else {
+      Objects objs = gcsUtil.listObjects(bucket, prefix, null);
+      names = objs.getItems().stream().map(obj -> 
obj.getName()).collect(Collectors.toList());
+    }
+    assertEquals(
+        Arrays.asList("shakespeare/kingrichardii.txt", 
"shakespeare/kingrichardiii.txt"), names);
+
+    final String randomPrefix = "my-random-prefix/random";
+    if (experiment.equals("use_gcsutil_v2")) {
+      Page<Blob> blobs = gcsUtil.listBlobs(bucket, randomPrefix, null);
+      assertEquals(0, blobs.streamAll().count());
+    } else {
+      Objects objs = gcsUtil.listObjects(bucket, randomPrefix, null);
+      assertEquals(null, objs.getItems());
+    }
+  }
+
+  @Test
+  public void testExpand() throws IOException {
+    final GcsPath existingPattern =
+        
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii*.txt");
+    List<GcsPath> paths = gcsUtil.expand(existingPattern);
+
+    assertEquals(
+        Arrays.asList(
+            
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
+            
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")),
+        paths);
+
+    final GcsPath nonExistentPattern1 =
+        
GcsPath.fromUri("gs://apache-beam-samples/my_random_folder/random*.txt");
+    assertTrue(gcsUtil.expand(nonExistentPattern1).isEmpty());
+
+    final GcsPath nonExistentPattern2 =
+        GcsPath.fromUri("gs://apache-beam-samples/shakespeare/king*.csv");
+    assertTrue(gcsUtil.expand(nonExistentPattern2).isEmpty());
+  }
+
+  @Test
+  public void testGetBucketOrGetBucketWithOptions() throws IOException {
+    final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples");
+
+    String bucket;
+    if (experiment.equals("use_gcsutil_v2")) {
+      bucket = gcsUtil.getBucketWithOptions(existingPath).getName();
+    } else {
+      bucket = gcsUtil.getBucket(existingPath).getName();
+    }
+    assertEquals("apache-beam-samples", bucket);
+
+    final GcsPath nonExistentPath = 
GcsPath.fromUri("gs://my-random-test-bucket-12345");
+    final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket");
+
+    if (experiment.equals("use_gcsutil_v2")) {
+      assertThrows(
+          FileNotFoundException.class, () -> 
gcsUtil.getBucketWithOptions(nonExistentPath));
+      assertThrows(AccessDeniedException.class, () -> 
gcsUtil.getBucketWithOptions(forbiddenPath));
+    } else {
+      assertThrows(FileNotFoundException.class, () -> 
gcsUtil.getBucket(nonExistentPath));
+      assertThrows(AccessDeniedException.class, () -> 
gcsUtil.getBucket(forbiddenPath));
+    }
+  }
+
+  @Test
+  public void testBucketAccessible() throws IOException {
+    final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples");
+    final GcsPath nonExistentPath = 
GcsPath.fromUri("gs://my-random-test-bucket-12345");
+    final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket");
+
+    assertEquals(true, gcsUtil.bucketAccessible(existingPath));
+    assertEquals(false, gcsUtil.bucketAccessible(nonExistentPath));
+    assertEquals(false, gcsUtil.bucketAccessible(forbiddenPath));
+  }
+
+  @Test
+  public void testBucketOwner() throws IOException {
+    final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples");
+    final long expectedProjectNumber = 844138762903L; // apache-beam-testing
+    assertEquals(expectedProjectNumber, gcsUtil.bucketOwner(existingPath));
+
+    final GcsPath nonExistentPath = 
GcsPath.fromUri("gs://my-random-test-bucket-12345");
+    final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket");
+    assertThrows(FileNotFoundException.class, () -> 
gcsUtil.bucketOwner(nonExistentPath));
+    assertThrows(AccessDeniedException.class, () -> 
gcsUtil.bucketOwner(forbiddenPath));
+  }
+
+  @Test
+  public void testCreateAndRemoveBucket() throws IOException {
+    final GcsPath gcsPath = 
GcsPath.fromUri("gs://apache-beam-test-bucket-12345");
+
+    if (experiment.equals("use_gcsutil_v2")) {
+      BucketInfo bucketInfo = BucketInfo.of(gcsPath.getBucket());
+      try {
+        assertFalse(gcsUtil.bucketAccessible(gcsPath));
+        gcsUtil.createBucket(bucketInfo);
+        assertTrue(gcsUtil.bucketAccessible(gcsPath));
+
+        // raise exception when the bucket already exists during creation
+        assertThrows(FileAlreadyExistsException.class, () -> 
gcsUtil.createBucket(bucketInfo));
+
+        assertTrue(gcsUtil.bucketAccessible(gcsPath));
+        gcsUtil.removeBucket(bucketInfo);
+        assertFalse(gcsUtil.bucketAccessible(gcsPath));
+
+        // raise exception when the bucket does not exist during removal
+        assertThrows(FileNotFoundException.class, () -> 
gcsUtil.removeBucket(bucketInfo));
+      } finally {
+        // clean up and ignore errors no matter what
+        try {
+          gcsUtil.removeBucket(bucketInfo);
+        } catch (IOException e) {
+        }
+      }
+    } else {
+      Bucket bucket = new Bucket().setName(gcsPath.getBucket());
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      String projectId = gcsOptions.getProject();
+      try {
+        assertFalse(gcsUtil.bucketAccessible(gcsPath));
+        gcsUtil.createBucket(projectId, bucket);
+        assertTrue(gcsUtil.bucketAccessible(gcsPath));
+
+        // raise exception when the bucket already exists during creation
+        assertThrows(
+            FileAlreadyExistsException.class, () -> 
gcsUtil.createBucket(projectId, bucket));
+
+        assertTrue(gcsUtil.bucketAccessible(gcsPath));
+        gcsUtil.removeBucket(bucket);
+        assertFalse(gcsUtil.bucketAccessible(gcsPath));
+
+        // raise exception when the bucket does not exist during removal
+        assertThrows(FileNotFoundException.class, () -> 
gcsUtil.removeBucket(bucket));
+      } finally {
+        // clean up and ignore errors no matter what
+        try {
+          gcsUtil.removeBucket(bucket);
+        } catch (IOException e) {
+        }
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
index ea047b0c6ea..f344c6c2dba 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
@@ -348,4 +348,24 @@ public class GcsPathTest {
     a.subpath(1, 1); // throws IllegalArgumentException
     Assert.fail();
   }
+
+  @Test
+  public void testIsWildcard() {
+    assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo*")));
+    assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo?")));
+    assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo[a-z]")));
+    assertFalse(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo")));
+  }
+
+  @Test
+  public void testGetNonWildcardPrefix() {
+    assertEquals("gs://bucket/foo", 
GcsPath.getNonWildcardPrefix("gs://bucket/foo*"));
+    assertEquals("gs://bucket/foo", 
GcsPath.getNonWildcardPrefix("gs://bucket/foo?"));
+    assertEquals("gs://bucket/foo", 
GcsPath.getNonWildcardPrefix("gs://bucket/foo[a-z]"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetNonWildcardPrefix_noWildcard() {
+    GcsPath.getNonWildcardPrefix("gs://bucket/foo/bar");
+  }
 }


Reply via email to