This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 72ce2074caf GCS client library migration in Java SDK - part 2 (#37502)
72ce2074caf is described below
commit 72ce2074cafef9b0af08ce5d28d12fbe0e8b99fe
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Feb 12 22:45:18 2026 -0500
GCS client library migration in Java SDK - part 2 (#37502)
* Migrate fileSize() and getObject()
* Move getNonWildcardPrefix() and isWildcard() from GcsUtilV1 to GcsPath
* Migrate listObjects()
* Migrate getBucket(), bucketAccessible(), verifyBucketAccessible().
* Add tests and extract setUp function.
* Migrate bucketOwner()
* Migrate createBucket() and removeBucket()
* Refactor exception handling.
* Change returned value of listBlobs. Migrate expand().
* Add deprecated annotation to some V1 apis. Refactor expand().
* Add exception handling for listBlobs()
* Add some more tests and comments.
* Fetch only the required field to minimize data transfer.
* Migrate getObjects() and refactor some method arguments.
* Refactor translateStorageException
* Control size of the batch for getBlobs.
* Formatting
* Fix GcpCoreApiSurfaceTest
* Add java deps.
* Change getBucketV2 to getBucketWithOptions. Fix GcpCoreApiSurfaceTest.
* Put the new IT tests into a different file.
* Revise according to gemini review.
* Change argument from List to Iterable in getBlobs()
* Remove redundant comments.
* Minor change.
* Revise according to review.
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 +
.../google-cloud-platform-core/build.gradle | 3 +
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 111 ++++++-
.../beam/sdk/extensions/gcp/util/GcsUtilV1.java | 20 +-
.../beam/sdk/extensions/gcp/util/GcsUtilV2.java | 325 +++++++++++++++++++++
.../sdk/extensions/gcp/util/gcsfs/GcsPath.java | 15 +
.../sdk/extensions/gcp/GcpCoreApiSurfaceTest.java | 15 +-
.../gcp/util/GcsUtilParameterizedIT.java | 300 +++++++++++++++++++
.../sdk/extensions/gcp/util/gcsfs/GcsPathTest.java | 20 ++
9 files changed, 788 insertions(+), 22 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 774674b545e..df2db7df71b 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -766,6 +766,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_spanner_bom :
"com.google.cloud:google-cloud-spanner-bom:$google_cloud_spanner_version",
google_cloud_spanner :
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom
sets version
google_cloud_spanner_test :
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
+ google_cloud_storage :
"com.google.cloud:google-cloud-storage", // google_cloud_platform_libraries_bom
sets version
google_cloud_tink :
"com.google.crypto.tink:tink:1.19.0",
google_cloud_vertexai :
"com.google.cloud:google-cloud-vertexai", //
google_cloud_platform_libraries_bom sets version
google_code_gson :
"com.google.code.gson:gson:$google_code_gson_version",
diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle
b/sdks/java/extensions/google-cloud-platform-core/build.gradle
index 8d21df50006..f1bfb63c7a3 100644
--- a/sdks/java/extensions/google-cloud-platform-core/build.gradle
+++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle
@@ -39,9 +39,12 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:core-java")
+ implementation library.java.gax
implementation library.java.google_http_client_gson
implementation library.java.google_auth_library_oauth2_http
implementation library.java.google_api_client
+ implementation library.java.google_cloud_core
+ implementation library.java.google_cloud_storage
implementation library.java.bigdataoss_gcsio
implementation library.java.bigdataoss_util
implementation library.java.google_api_services_cloudresourcemanager
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 220c08c6a7f..a2fdf24e9fb 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -20,11 +20,17 @@ package org.apache.beam.sdk.extensions.gcp.util;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
+import com.google.api.gax.paging.Page;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.auth.Credentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage.BlobGetOption;
+import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BucketGetOption;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
@@ -34,6 +40,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -44,6 +51,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public class GcsUtil {
@VisibleForTesting GcsUtilV1 delegate;
+ @VisibleForTesting @Nullable GcsUtilV2 delegateV2;
public static class GcsCountersOptions {
final GcsUtilV1.GcsCountersOptions delegate;
@@ -96,11 +104,11 @@ public class GcsUtil {
}
public static String getNonWildcardPrefix(String globExp) {
- return GcsUtilV1.getNonWildcardPrefix(globExp);
+ return GcsPath.getNonWildcardPrefix(globExp);
}
public static boolean isWildcard(GcsPath spec) {
- return GcsUtilV1.isWildcard(spec);
+ return GcsPath.isWildcard(spec);
}
@VisibleForTesting
@@ -125,6 +133,12 @@ public class GcsUtil {
rewriteDataOpBatchLimit,
gcsCountersOptions.delegate,
gcsOptions);
+
+ if (ExperimentalOptions.hasExperiment(gcsOptions, "use_gcsutil_v2")) {
+ this.delegateV2 = new GcsUtilV2(gcsOptions);
+ } else {
+ this.delegateV2 = null;
+ }
}
protected void setStorageClient(Storage storageClient) {
@@ -136,6 +150,9 @@ public class GcsUtil {
}
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.expand(gcsPattern);
+ }
return delegate.expand(gcsPattern);
}
@@ -146,18 +163,34 @@ public class GcsUtil {
}
public long fileSize(GcsPath path) throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.fileSize(path);
+ }
return delegate.fileSize(path);
}
+ /** @deprecated use {@link #getBlob(GcsPath)}. */
+ @Deprecated
public StorageObject getObject(GcsPath gcsPath) throws IOException {
return delegate.getObject(gcsPath);
}
+ /** @deprecated use {@link #getBlob(GcsPath, BlobGetOption...)}. */
+ @Deprecated
@VisibleForTesting
StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper)
throws IOException {
return delegate.getObject(gcsPath, backoff, sleeper);
}
+ public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws
IOException {
+ if (delegateV2 != null) {
+ return delegateV2.getBlob(gcsPath, options);
+ }
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+
+ /** @deprecated use {@link #getBlobs(Iterable, BlobGetOption...)}. */
+ @Deprecated
public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
throws IOException {
List<GcsUtilV1.StorageObjectOrIOException> legacy =
delegate.getObjects(gcsPaths);
return legacy.stream()
@@ -165,17 +198,51 @@ public class GcsUtil {
.collect(java.util.stream.Collectors.toList());
}
+ public List<BlobResult> getBlobs(Iterable<GcsPath> gcsPaths,
BlobGetOption... options)
+ throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.getBlobs(gcsPaths, options);
+ }
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+
+ /** @deprecated use {@link #listBlobs(String, String, String,
BlobListOption...)}. */
+ @Deprecated
public Objects listObjects(String bucket, String prefix, @Nullable String
pageToken)
throws IOException {
return delegate.listObjects(bucket, prefix, pageToken);
}
+ /** @deprecated use {@link #listBlobs(String, String, String, String,
BlobListOption...)}. */
+ @Deprecated
public Objects listObjects(
String bucket, String prefix, @Nullable String pageToken, @Nullable
String delimiter)
throws IOException {
return delegate.listObjects(bucket, prefix, pageToken, delimiter);
}
+ public Page<Blob> listBlobs(
+ String bucket, String prefix, @Nullable String pageToken,
BlobListOption... options)
+ throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.listBlobs(bucket, prefix, pageToken, options);
+ }
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+
+ public Page<Blob> listBlobs(
+ String bucket,
+ String prefix,
+ @Nullable String pageToken,
+ @Nullable String delimiter,
+ BlobListOption... options)
+ throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.listBlobs(bucket, prefix, pageToken, delimiter,
options);
+ }
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+
@VisibleForTesting
List<Long> fileSizes(List<GcsPath> paths) throws IOException {
return delegate.fileSizes(paths);
@@ -254,29 +321,69 @@ public class GcsUtil {
}
public void verifyBucketAccessible(GcsPath path) throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.verifyBucketAccessible(path);
+ return;
+ }
delegate.verifyBucketAccessible(path);
}
public boolean bucketAccessible(GcsPath path) throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.bucketAccessible(path);
+ }
return delegate.bucketAccessible(path);
}
public long bucketOwner(GcsPath path) throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.bucketProject(path);
+ }
return delegate.bucketOwner(path);
}
+ /** @deprecated use {@link #createBucket(BucketInfo)}. */
+ @Deprecated
public void createBucket(String projectId, Bucket bucket) throws IOException
{
delegate.createBucket(projectId, bucket);
}
+ public void createBucket(BucketInfo bucketInfo) throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.createBucket(bucketInfo);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+ }
+
+ /** @deprecated use {@link #getBucketWithOptions(GcsPath,
BucketGetOption...)} . */
+ @Deprecated
public @Nullable Bucket getBucket(GcsPath path) throws IOException {
return delegate.getBucket(path);
}
+ public com.google.cloud.storage.@Nullable Bucket getBucketWithOptions(
+ GcsPath path, BucketGetOption... options) throws IOException {
+ if (delegateV2 != null) {
+ return delegateV2.getBucket(path, options);
+ }
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+
+ /** @deprecated use {@link #removeBucket(BucketInfo)}. */
+ @Deprecated
public void removeBucket(Bucket bucket) throws IOException {
delegate.removeBucket(bucket);
}
+ public void removeBucket(BucketInfo bucketInfo) throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.removeBucket(bucketInfo);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+ }
+
@VisibleForTesting
boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper)
throws IOException {
return delegate.bucketAccessible(path, backoff, sleeper);
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
index c44eb36c263..1ade4be6fdb 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java
@@ -76,7 +76,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
@@ -165,9 +164,6 @@ class GcsUtilV1 {
/** Maximum number of items to retrieve per Objects.List request. */
private static final long MAX_LIST_ITEMS_PER_CALL = 1024;
- /** Matches a glob containing a wildcard, capturing the portion before the
first wildcard. */
- private static final Pattern GLOB_PREFIX =
Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
-
/** Maximum number of requests permitted in a GCS batch request. */
private static final int MAX_REQUESTS_PER_BATCH = 100;
/** Default maximum number of requests permitted in a GCS batch request
where data is copied. */
@@ -224,18 +220,6 @@ class GcsUtilV1 {
@VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed;
- /** Returns the prefix portion of the glob that doesn't contain wildcards. */
- public static String getNonWildcardPrefix(String globExp) {
- Matcher m = GLOB_PREFIX.matcher(globExp);
- checkArgument(m.matches(), String.format("Glob expression: [%s] is not
expandable.", globExp));
- return m.group("PREFIX");
- }
-
- /** Returns true if the given {@code spec} contains wildcard. */
- public static boolean isWildcard(GcsPath spec) {
- return GLOB_PREFIX.matcher(spec.getObject()).matches();
- }
-
@VisibleForTesting
GcsUtilV1(
Storage storageClient,
@@ -333,9 +317,9 @@ class GcsUtilV1 {
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
Pattern p = null;
String prefix = null;
- if (isWildcard(gcsPattern)) {
+ if (GcsPath.isWildcard(gcsPattern)) {
// Part before the first wildcard character.
- prefix = getNonWildcardPrefix(gcsPattern.getObject());
+ prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject());
p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
} else {
// Not a wildcard.
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
new file mode 100644
index 00000000000..a2df45511c9
--- /dev/null
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.gax.paging.Page;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Bucket;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobField;
+import com.google.cloud.storage.Storage.BlobGetOption;
+import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BucketField;
+import com.google.cloud.storage.Storage.BucketGetOption;
+import com.google.cloud.storage.StorageBatch;
+import com.google.cloud.storage.StorageBatchResult;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+class GcsUtilV2 {
+ private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(GcsUtilV2.class);
+
+ public static class GcsUtilFactory implements DefaultValueFactory<GcsUtilV2>
{
+ @Override
+ public GcsUtilV2 create(PipelineOptions options) {
+ // GcsOptions gcsOptions = options.as(GcsOptions.class);
+ // Storage.Builder storageBuilder =
Transport.newStorageClient(gcsOptions);
+ return new GcsUtilV2(options);
+ }
+ }
+
+ private Storage storage;
+
+ /** Maximum number of items to retrieve per Objects.List request. */
+ private static final long MAX_LIST_BLOBS_PER_CALL = 1024;
+
+ /** Maximum number of requests permitted in a GCS batch request. */
+ private static final int MAX_REQUESTS_PER_BATCH = 100;
+
+ GcsUtilV2(PipelineOptions options) {
+ String projectId = options.as(GcpOptions.class).getProject();
+ storage =
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
+ }
+
+ @SuppressWarnings({
+ "nullness" // For Creating AccessDeniedException and
FileAlreadyExistsException with null.
+ })
+ private IOException translateStorageException(GcsPath gcsPath,
StorageException e) {
+ switch (e.getCode()) {
+ case 403:
+ return new AccessDeniedException(gcsPath.toString(), null,
e.getMessage());
+ case 409:
+ return new FileAlreadyExistsException(gcsPath.toString(), null,
e.getMessage());
+ default:
+ return new IOException(e);
+ }
+ }
+
+ private IOException translateStorageException(
+ String bucketName, @Nullable String blobName, StorageException e) {
+ return translateStorageException(GcsPath.fromComponents(bucketName,
blobName), e);
+ }
+
+ public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws
IOException {
+ try {
+ Blob blob = storage.get(gcsPath.getBucket(), gcsPath.getObject(),
options);
+ if (blob == null) {
+ throw new FileNotFoundException(
+ String.format("The specified file does not exist: %s",
gcsPath.toString()));
+ }
+ return blob;
+ } catch (StorageException e) {
+ throw translateStorageException(gcsPath, e);
+ }
+ }
+
+ public long fileSize(GcsPath gcsPath) throws IOException {
+ return getBlob(gcsPath, BlobGetOption.fields(BlobField.SIZE)).getSize();
+ }
+
+ /** A class that holds either a {@link Blob} or an {@link IOException}. */
+ @AutoValue
+ public abstract static class BlobResult {
+
+ /** Returns the {@link Blob}. */
+ public abstract @Nullable Blob blob();
+
+ /** Returns the {@link IOException}. */
+ public abstract @Nullable IOException ioException();
+
+ @VisibleForTesting
+ public static BlobResult create(Blob blob) {
+ return new AutoValue_GcsUtilV2_BlobResult(checkNotNull(blob, "blob"),
null /* ioException */);
+ }
+
+ @VisibleForTesting
+ public static BlobResult create(IOException ioException) {
+ return new AutoValue_GcsUtilV2_BlobResult(
+ null /* blob */, checkNotNull(ioException, "ioException"));
+ }
+ }
+
+ public List<BlobResult> getBlobs(Iterable<GcsPath> gcsPaths,
BlobGetOption... options)
+ throws IOException {
+
+ List<BlobResult> results = new ArrayList<>();
+
+ for (List<GcsPath> pathPartition :
+ Lists.partition(Lists.newArrayList(gcsPaths), MAX_REQUESTS_PER_BATCH))
{
+
+ // Create a new empty batch every time
+ StorageBatch batch = storage.batch();
+ List<StorageBatchResult<Blob>> batchResultFutures = new ArrayList<>();
+
+ for (GcsPath path : pathPartition) {
+ batchResultFutures.add(batch.get(path.getBucket(), path.getObject(),
options));
+ }
+ batch.submit();
+
+ for (int i = 0; i < batchResultFutures.size(); i++) {
+ StorageBatchResult<Blob> future = batchResultFutures.get(i);
+ try {
+ Blob blob = future.get();
+ if (blob != null) {
+ results.add(BlobResult.create(blob));
+ } else {
+ results.add(
+ BlobResult.create(
+ new FileNotFoundException(
+ String.format(
+ "The specified file does not exist: %s",
+ pathPartition.get(i).toString()))));
+ }
+ } catch (StorageException e) {
+
results.add(BlobResult.create(translateStorageException(pathPartition.get(i),
e)));
+ }
+ }
+ }
+ return results;
+ }
+
+ /** Lists {@link Blob}s given the {@code bucket}, {@code prefix}, {@code
pageToken}. */
+ public Page<Blob> listBlobs(
+ String bucket,
+ String prefix,
+ @Nullable String pageToken,
+ @Nullable String delimiter,
+ BlobListOption... options)
+ throws IOException {
+ List<BlobListOption> blobListOptions = new ArrayList<>();
+ blobListOptions.add(BlobListOption.pageSize(MAX_LIST_BLOBS_PER_CALL));
+ if (pageToken != null) {
+ blobListOptions.add(BlobListOption.pageToken(pageToken));
+ }
+ if (prefix != null) {
+ blobListOptions.add(BlobListOption.prefix(prefix));
+ }
+ if (delimiter != null) {
+ blobListOptions.add(BlobListOption.delimiter(delimiter));
+ }
+ if (options != null && options.length > 0) {
+ for (BlobListOption option : options) {
+ blobListOptions.add(option);
+ }
+ }
+
+ try {
+ return storage.list(bucket, blobListOptions.toArray(new
BlobListOption[0]));
+ } catch (StorageException e) {
+ throw translateStorageException(bucket, prefix, e);
+ }
+ }
+
+ public Page<Blob> listBlobs(
+ String bucket, String prefix, @Nullable String pageToken,
BlobListOption... options)
+ throws IOException {
+ return listBlobs(bucket, prefix, pageToken, null, options);
+ }
+
+ /**
+ * Expands a pattern into matched paths. The pattern path may contain globs,
which are expanded in
+ * the result. For patterns that only match a single object, we ensure that
the object exists.
+ */
+ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
+ // Handle Non-Wildcard Path
+ if (!GcsPath.isWildcard(gcsPattern)) {
+ try {
+ // Use a get request to fetch the metadata of the object, and ignore
the return value.
+ // The request has strong global consistency.
+ getBlob(gcsPattern, BlobGetOption.fields(BlobField.NAME));
+ return ImmutableList.of(gcsPattern);
+ } catch (FileNotFoundException e) {
+ // If the path was not found, return an empty list.
+ return ImmutableList.of();
+ }
+ }
+
+ // Handle Wildcard Path
+ // TODO: check out BlobListOption.matchGlob() for a similar function.
+ String prefix = GcsPath.getNonWildcardPrefix(gcsPattern.getObject());
+ Pattern p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
+
+ LOG.debug(
+ "matching files in bucket {}, prefix {} against pattern {}",
+ gcsPattern.getBucket(),
+ prefix,
+ p.toString());
+
+ List<GcsPath> results = new ArrayList<>();
+ Page<Blob> blobs =
+ listBlobs(
+ gcsPattern.getBucket(),
+ prefix,
+ null,
+ BlobListOption.fields(BlobField.NAME, BlobField.BUCKET));
+ // Iterate through all elements page by page (lazily)
+ for (Blob b : blobs.iterateAll()) {
+ String name = b.getName();
+ // Filter objects based on the regex. Skip directories, which end with a
slash.
+ if (p.matcher(name).matches() && !name.endsWith("/")) {
+ LOG.debug("Matched object: {}", name);
+ results.add(GcsPath.fromComponents(b.getBucket(), b.getName()));
+ }
+ }
+ return results;
+ }
+
+ /** Get the {@link Bucket} from Cloud Storage path or propagates an
exception. */
+ public Bucket getBucket(GcsPath path, BucketGetOption... options) throws
IOException {
+ String bucketName = path.getBucket();
+ try {
+ Bucket bucket = storage.get(bucketName, options);
+ if (bucket == null) {
+ throw new FileNotFoundException(
+ String.format("The specified bucket does not exist: gs://%s",
bucketName));
+ }
+ return bucket;
+ } catch (StorageException e) {
+ throw translateStorageException(bucketName, null, e);
+ }
+ }
+
+ /** Returns whether the GCS bucket exists and is accessible. */
+ public boolean bucketAccessible(GcsPath path) {
+ try {
+ // Fetch only the name field to minimize data transfer
+ getBucket(path, BucketGetOption.fields(BucketField.NAME));
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Checks whether the GCS bucket exists. Similar to {@link
#bucketAccessible(GcsPath)}, but throws
+ * exception if the bucket is inaccessible due to permissions or does not
exist.
+ */
+ public void verifyBucketAccessible(GcsPath path) throws IOException {
+ // Fetch only the name field to minimize data transfer
+ getBucket(path, BucketGetOption.fields(BucketField.NAME));
+ }
+
+ /**
+ * Returns the project number of the project which owns this bucket. If the
bucket exists, it must
+ * be accessible otherwise the permissions exception will be propagated. If
the bucket does not
+ * exist, an exception will be thrown.
+ */
+ public long bucketProject(GcsPath path) throws IOException {
+ Bucket bucket = getBucket(path,
BucketGetOption.fields(BucketField.PROJECT));
+ return bucket.getProject().longValue();
+ }
+
+ public void createBucket(BucketInfo bucketInfo) throws IOException {
+ try {
+ storage.create(bucketInfo);
+ } catch (StorageException e) {
+ throw translateStorageException(bucketInfo.getName(), null, e);
+ }
+ }
+
+ public void removeBucket(BucketInfo bucketInfo) throws IOException {
+ try {
+ if (!storage.delete(bucketInfo.getName())) {
+ throw new FileNotFoundException(
+ String.format("The specified bucket does not exist: gs://%s",
bucketInfo.getName()));
+ }
+ } catch (StorageException e) {
+ throw translateStorageException(bucketInfo.getName(), null, e);
+ }
+ }
+}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
index 29c312d7a5e..745ff4d3630 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPath.java
@@ -129,6 +129,9 @@ public class GcsPath implements Path, Serializable {
/** Pattern that is used to validate a GCS bucket name. */
private static final Pattern GCS_BUCKET_NAME =
Pattern.compile("[a-z0-9][-_a-z0-9.]+[a-z0-9]");
+ /** Matches a glob containing a wildcard, capturing the portion before the
first wildcard. */
+ private static final Pattern GLOB_PREFIX =
Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
+
/** Creates a GcsPath from a OnePlatform resource name in string form. */
public static GcsPath fromResourceName(String name) {
Matcher m = GCS_RESOURCE_NAME.matcher(name);
@@ -605,4 +608,16 @@ public class GcsPath implements Path, Serializable {
return bucket + "/" + object;
}
}
+
+ /** Returns the prefix portion of the glob before the first wildcard
character. */
+ public static String getNonWildcardPrefix(String globExp) {
+ Matcher m = GLOB_PREFIX.matcher(globExp);
+ checkArgument(m.matches(), String.format("Glob expression: [%s] is not
expandable.", globExp));
+ return m.group("PREFIX");
+ }
+
+ /** Returns true if the given {@code spec} contains wildcard. */
+ public static boolean isWildcard(GcsPath spec) {
+ return GLOB_PREFIX.matcher(spec.getObject()).matches();
+ }
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index 89c60632084..8af5e2260fc 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -50,20 +50,31 @@ public class GcpCoreApiSurfaceTest {
@SuppressWarnings("unchecked")
final Set<Matcher<Class<?>>> allowedClasses =
ImmutableSet.of(
+ classesInPackage("com.fasterxml.jackson.annotation"),
classesInPackage("com.google.api.client.googleapis"),
classesInPackage("com.google.api.client.http"),
classesInPackage("com.google.api.client.json"),
classesInPackage("com.google.api.client.util"),
+ classesInPackage("com.google.api.core"),
+ classesInPackage("com.google.api.gax"),
classesInPackage("com.google.api.services.storage"),
classesInPackage("com.google.auth"),
- classesInPackage("com.fasterxml.jackson.annotation"),
+ classesInPackage("com.google.cloud"),
+ classesInPackage("com.google.errorprone"),
+ classesInPackage("com.google.iam"),
+ classesInPackage("com.google.protobuf"),
+ classesInPackage("com.google.storage"),
+ classesInPackage("com.google.type"),
classesInPackage("com.google.cloud.hadoop.gcsio"),
classesInPackage("com.google.common.collect"), // Via
gcs-connector ReadOptions builder
+ classesInPackage("io.grpc"),
+ classesInPackage("io.opentelemetry"),
classesInPackage("java"),
classesInPackage("javax"),
classesInPackage("org.apache.beam.model.pipeline.v1"),
classesInPackage("org.apache.beam.sdk"),
- classesInPackage("org.joda.time"));
+ classesInPackage("org.joda.time"),
+ classesInPackage("org.threeten.bp"));
assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
new file mode 100644
index 00000000000..db5097c9515
--- /dev/null
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.gax.paging.Page;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BucketInfo;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.UsesKms;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Integration tests for {@link GcsUtil}. These tests are designed to run
against production Google
+ * Cloud Storage.
+ *
+ * <p>This is a runnerless integration test, even though the Beam IT framework
assumes one. Thus,
+ * this test should only be run against single runner (such as DirectRunner).
+ */
+@RunWith(Parameterized.class)
+@Category(UsesKms.class)
+public class GcsUtilParameterizedIT {
+
+ @Parameters(name = "{0}")
+ public static Iterable<String> data() {
+ return Arrays.asList("use_gcsutil_v1", "use_gcsutil_v2");
+ }
+
+ @Parameter public String experiment;
+
+ private TestPipelineOptions options;
+ private GcsUtil gcsUtil;
+
+ @Before
+ public void setUp() {
+ options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+
+ // set the experimental flag.
+ ExperimentalOptions experimentalOptions =
options.as(ExperimentalOptions.class);
+ experimentalOptions.setExperiments(Collections.singletonList(experiment));
+
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ gcsUtil = gcsOptions.getGcsUtil();
+ }
+
+ @Test
+ public void testFileSize() throws IOException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final long expectedSize = 157283L;
+
+ assertEquals(expectedSize, gcsUtil.fileSize(gcsPath));
+ }
+
+ @Test
+ public void testGetObjectOrGetBlob() throws IOException {
+ final GcsPath existingPath =
+ GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final String expectedCRC = "s0a3Tg==";
+
+ String crc;
+ if (experiment.equals("use_gcsutil_v2")) {
+ Blob blob = gcsUtil.getBlob(existingPath);
+ crc = blob.getCrc32c();
+ } else {
+ StorageObject obj = gcsUtil.getObject(existingPath);
+ crc = obj.getCrc32c();
+ }
+ assertEquals(expectedCRC, crc);
+
+ final GcsPath nonExistentPath =
+ GcsPath.fromUri("gs://my-random-test-bucket-12345/unknown-12345.txt");
+ final GcsPath forbiddenPath =
GcsPath.fromUri("gs://test-bucket/unknown-12345.txt");
+
+ if (experiment.equals("use_gcsutil_v2")) {
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.getBlob(nonExistentPath));
+ // For V2, we are returning AccessDeniedException (a subclass of
IOException) for forbidden
+ // paths.
+ assertThrows(AccessDeniedException.class, () ->
gcsUtil.getBlob(forbiddenPath));
+ } else {
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.getObject(nonExistentPath));
+ assertThrows(IOException.class, () -> gcsUtil.getObject(forbiddenPath));
+ }
+ }
+
+ @Test
+ public void testGetObjectsOrGetBlobs() throws IOException {
+ final GcsPath existingPath =
+ GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+ final GcsPath nonExistentPath =
+ GcsPath.fromUri("gs://my-random-test-bucket-12345/unknown-12345.txt");
+ final List<GcsPath> paths = Arrays.asList(existingPath, nonExistentPath);
+
+ if (experiment.equals("use_gcsutil_v2")) {
+ List<GcsUtilV2.BlobResult> results = gcsUtil.getBlobs(paths);
+ assertEquals(2, results.size());
+ assertTrue(results.get(0).blob() != null);
+ assertTrue(results.get(0).ioException() == null);
+ assertTrue(results.get(1).blob() == null);
+ assertTrue(results.get(1).ioException() != null);
+ } else {
+ List<GcsUtil.StorageObjectOrIOException> results =
gcsUtil.getObjects(paths);
+ assertEquals(2, results.size());
+ assertTrue(results.get(0).storageObject() != null);
+ assertTrue(results.get(0).ioException() == null);
+ assertTrue(results.get(1).storageObject() == null);
+ assertTrue(results.get(1).ioException() != null);
+ }
+ }
+
+ @Test
+ public void testListObjectsOrListBlobs() throws IOException {
+ final String bucket = "apache-beam-samples";
+ final String prefix = "shakespeare/kingrichard";
+
+ List<String> names;
+ if (experiment.equals("use_gcsutil_v2")) {
+ Page<Blob> blobs = gcsUtil.listBlobs(bucket, prefix, null);
+ names = blobs.streamAll().map(blob ->
blob.getName()).collect(Collectors.toList());
+ } else {
+ Objects objs = gcsUtil.listObjects(bucket, prefix, null);
+ names = objs.getItems().stream().map(obj ->
obj.getName()).collect(Collectors.toList());
+ }
+ assertEquals(
+ Arrays.asList("shakespeare/kingrichardii.txt",
"shakespeare/kingrichardiii.txt"), names);
+
+ final String randomPrefix = "my-random-prefix/random";
+ if (experiment.equals("use_gcsutil_v2")) {
+ Page<Blob> blobs = gcsUtil.listBlobs(bucket, randomPrefix, null);
+ assertEquals(0, blobs.streamAll().count());
+ } else {
+ Objects objs = gcsUtil.listObjects(bucket, randomPrefix, null);
+ assertEquals(null, objs.getItems());
+ }
+ }
+
+ @Test
+ public void testExpand() throws IOException {
+ final GcsPath existingPattern =
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii*.txt");
+ List<GcsPath> paths = gcsUtil.expand(existingPattern);
+
+ assertEquals(
+ Arrays.asList(
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")),
+ paths);
+
+ final GcsPath nonExistentPattern1 =
+
GcsPath.fromUri("gs://apache-beam-samples/my_random_folder/random*.txt");
+ assertTrue(gcsUtil.expand(nonExistentPattern1).isEmpty());
+
+ final GcsPath nonExistentPattern2 =
+ GcsPath.fromUri("gs://apache-beam-samples/shakespeare/king*.csv");
+ assertTrue(gcsUtil.expand(nonExistentPattern2).isEmpty());
+ }
+
+ @Test
+ public void testGetBucketOrGetBucketWithOptions() throws IOException {
+ final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples");
+
+ String bucket;
+ if (experiment.equals("use_gcsutil_v2")) {
+ bucket = gcsUtil.getBucketWithOptions(existingPath).getName();
+ } else {
+ bucket = gcsUtil.getBucket(existingPath).getName();
+ }
+ assertEquals("apache-beam-samples", bucket);
+
+ final GcsPath nonExistentPath =
GcsPath.fromUri("gs://my-random-test-bucket-12345");
+ final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket");
+
+ if (experiment.equals("use_gcsutil_v2")) {
+ assertThrows(
+ FileNotFoundException.class, () ->
gcsUtil.getBucketWithOptions(nonExistentPath));
+ assertThrows(AccessDeniedException.class, () ->
gcsUtil.getBucketWithOptions(forbiddenPath));
+ } else {
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.getBucket(nonExistentPath));
+ assertThrows(AccessDeniedException.class, () ->
gcsUtil.getBucket(forbiddenPath));
+ }
+ }
+
+ @Test
+ public void testBucketAccessible() throws IOException {
+ final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples");
+ final GcsPath nonExistentPath =
GcsPath.fromUri("gs://my-random-test-bucket-12345");
+ final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket");
+
+ assertEquals(true, gcsUtil.bucketAccessible(existingPath));
+ assertEquals(false, gcsUtil.bucketAccessible(nonExistentPath));
+ assertEquals(false, gcsUtil.bucketAccessible(forbiddenPath));
+ }
+
+ @Test
+ public void testBucketOwner() throws IOException {
+ final GcsPath existingPath = GcsPath.fromUri("gs://apache-beam-samples");
+ final long expectedProjectNumber = 844138762903L; // apache-beam-testing
+ assertEquals(expectedProjectNumber, gcsUtil.bucketOwner(existingPath));
+
+ final GcsPath nonExistentPath =
GcsPath.fromUri("gs://my-random-test-bucket-12345");
+ final GcsPath forbiddenPath = GcsPath.fromUri("gs://test-bucket");
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.bucketOwner(nonExistentPath));
+ assertThrows(AccessDeniedException.class, () ->
gcsUtil.bucketOwner(forbiddenPath));
+ }
+
+ @Test
+ public void testCreateAndRemoveBucket() throws IOException {
+ final GcsPath gcsPath =
GcsPath.fromUri("gs://apache-beam-test-bucket-12345");
+
+ if (experiment.equals("use_gcsutil_v2")) {
+ BucketInfo bucketInfo = BucketInfo.of(gcsPath.getBucket());
+ try {
+ assertFalse(gcsUtil.bucketAccessible(gcsPath));
+ gcsUtil.createBucket(bucketInfo);
+ assertTrue(gcsUtil.bucketAccessible(gcsPath));
+
+ // raise exception when the bucket already exists during creation
+ assertThrows(FileAlreadyExistsException.class, () ->
gcsUtil.createBucket(bucketInfo));
+
+ assertTrue(gcsUtil.bucketAccessible(gcsPath));
+ gcsUtil.removeBucket(bucketInfo);
+ assertFalse(gcsUtil.bucketAccessible(gcsPath));
+
+ // raise exception when the bucket does not exist during removal
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.removeBucket(bucketInfo));
+ } finally {
+ // clean up and ignore errors no matter what
+ try {
+ gcsUtil.removeBucket(bucketInfo);
+ } catch (IOException e) {
+ }
+ }
+ } else {
+ Bucket bucket = new Bucket().setName(gcsPath.getBucket());
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ String projectId = gcsOptions.getProject();
+ try {
+ assertFalse(gcsUtil.bucketAccessible(gcsPath));
+ gcsUtil.createBucket(projectId, bucket);
+ assertTrue(gcsUtil.bucketAccessible(gcsPath));
+
+ // raise exception when the bucket already exists during creation
+ assertThrows(
+ FileAlreadyExistsException.class, () ->
gcsUtil.createBucket(projectId, bucket));
+
+ assertTrue(gcsUtil.bucketAccessible(gcsPath));
+ gcsUtil.removeBucket(bucket);
+ assertFalse(gcsUtil.bucketAccessible(gcsPath));
+
+ // raise exception when the bucket does not exist during removal
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.removeBucket(bucket));
+ } finally {
+ // clean up and ignore errors no matter what
+ try {
+ gcsUtil.removeBucket(bucket);
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
index ea047b0c6ea..f344c6c2dba 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
@@ -348,4 +348,24 @@ public class GcsPathTest {
a.subpath(1, 1); // throws IllegalArgumentException
Assert.fail();
}
+
+ @Test
+ public void testIsWildcard() {
+ assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo*")));
+ assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo?")));
+ assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo[a-z]")));
+ assertFalse(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo")));
+ }
+
+ @Test
+ public void testGetNonWildcardPrefix() {
+ assertEquals("gs://bucket/foo",
GcsPath.getNonWildcardPrefix("gs://bucket/foo*"));
+ assertEquals("gs://bucket/foo",
GcsPath.getNonWildcardPrefix("gs://bucket/foo?"));
+ assertEquals("gs://bucket/foo",
GcsPath.getNonWildcardPrefix("gs://bucket/foo[a-z]"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetNonWildcardPrefix_noWildcard() {
+ GcsPath.getNonWildcardPrefix("gs://bucket/foo/bar");
+ }
}