This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 74fc46c404d GCS client library migration in Java SDK - part 2b (#37592)
74fc46c404d is described below
commit 74fc46c404d0e76d89f01d144d4038ba94dd5677
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Feb 18 15:26:33 2026 -0500
GCS client library migration in Java SDK - part 2b (#37592)
* Add copy() and remove() for GcsUtil V2.
* Add tests and modify copy and remove to take strategies
* Add deprecated annotations to V1 copy and remove.
* Refactor MissingStrategy and OverwriteStrategy enums. Add rewriteHelper()
and move().
* Add rename tests and refacor copy and remove tests.
* Refactor rename
* Add experimental annotations to the new copy, remove and rename
* Remove unused import.
* Fix style.
* Trigger post commit java for the integration tests of GcsUtil.
* Revise according to reviews.
---
.github/trigger_files/beam_PostCommit_Java.json | 2 +-
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 72 +++++
.../beam/sdk/extensions/gcp/util/GcsUtilV2.java | 161 +++++++++++-
.../gcp/util/GcsUtilParameterizedIT.java | 290 +++++++++++++++++++++
.../sdk/extensions/gcp/util/gcsfs/GcsPathTest.java | 2 +
5 files changed, 525 insertions(+), 2 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Java.json
b/.github/trigger_files/beam_PostCommit_Java.json
index 756b765e59e..7b4c1ba6702 100644
--- a/.github/trigger_files/beam_PostCommit_Java.json
+++ b/.github/trigger_files/beam_PostCommit_Java.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 4
+ "modification": 6
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index a2fdf24e9fb..33399ef87b6 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -37,16 +37,21 @@ import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
public class GcsUtil {
@@ -433,12 +438,65 @@ public class GcsUtil {
delegate.copy(srcFilenames, destFilenames);
}
+ /** experimental api. */
+ public void copyV2(Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths)
throws IOException {
+ copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE);
+ }
+
+ /** experimental api. */
+ public void copy(
+ Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths,
OverwriteStrategy strategy)
+ throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.copy(srcPaths, dstPaths, strategy);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+ }
+
public void rename(
Iterable<String> srcFilenames, Iterable<String> destFilenames,
MoveOptions... moveOptions)
throws IOException {
delegate.rename(srcFilenames, destFilenames, moveOptions);
}
+ /** experimental api. */
+ public void renameV2(
+ Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, MoveOptions...
moveOptions)
+ throws IOException {
+ Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
+ final MissingStrategy srcMissing;
+ final OverwriteStrategy dstOverwrite;
+
+ if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) {
+ srcMissing = MissingStrategy.SKIP_IF_MISSING;
+ } else {
+ srcMissing = MissingStrategy.FAIL_IF_MISSING;
+ }
+
+ if
(moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) {
+ dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS;
+ } else {
+ dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE;
+ }
+
+ rename(srcPaths, dstPaths, srcMissing, dstOverwrite);
+ }
+
+ /** experimental api. */
+ public void rename(
+ Iterable<GcsPath> srcPaths,
+ Iterable<GcsPath> dstPaths,
+ MissingStrategy srcMissing,
+ OverwriteStrategy dstOverwrite)
+ throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+ }
+
@VisibleForTesting
@SuppressWarnings("JdkObsolete") // for LinkedList
java.util.LinkedList<GcsUtilV1.RewriteOp> makeRewriteOps(
@@ -469,6 +527,20 @@ public class GcsUtil {
delegate.remove(filenames);
}
+ /** experimental api. */
+ public void removeV2(Iterable<GcsPath> paths) throws IOException {
+ remove(paths, MissingStrategy.SKIP_IF_MISSING);
+ }
+
+ /** experimental api. */
+ public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws
IOException {
+ if (delegateV2 != null) {
+ delegateV2.remove(paths, strategy);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+ }
+
@SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION")
public static class StorageObjectOrIOException {
final GcsUtilV1.StorageObjectOrIOException delegate;
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
index a2df45511c9..b00b7ce0d72 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
@@ -18,19 +18,24 @@
package org.apache.beam.sdk.extensions.gcp.util;
import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.gax.paging.Page;
import com.google.auto.value.AutoValue;
import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.CopyWriter;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BucketField;
import com.google.cloud.storage.Storage.BucketGetOption;
+import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageBatchResult;
import com.google.cloud.storage.StorageException;
@@ -71,18 +76,27 @@ class GcsUtilV2 {
/** Maximum number of requests permitted in a GCS batch request. */
private static final int MAX_REQUESTS_PER_BATCH = 100;
+ /**
+ * Limit the number of bytes Cloud Storage will attempt to copy before
responding to an individual
+ * request. If you see Read Timeout errors, try reducing this value.
+ */
+ private static final long MEGABYTES_COPIED_PER_CHUNK = 2048L;
+
GcsUtilV2(PipelineOptions options) {
String projectId = options.as(GcpOptions.class).getProject();
storage =
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
}
@SuppressWarnings({
- "nullness" // For Creating AccessDeniedException and
FileAlreadyExistsException with null.
+ "nullness" // For Creating AccessDeniedException FileNotFoundException, and
+ // FileAlreadyExistsException with null.
})
private IOException translateStorageException(GcsPath gcsPath,
StorageException e) {
switch (e.getCode()) {
case 403:
return new AccessDeniedException(gcsPath.toString(), null,
e.getMessage());
+ case 404:
+ return new FileNotFoundException(e.getMessage());
case 409:
return new FileAlreadyExistsException(gcsPath.toString(), null,
e.getMessage());
default:
@@ -259,6 +273,151 @@ class GcsUtilV2 {
return results;
}
+ public enum MissingStrategy {
+ FAIL_IF_MISSING,
+ SKIP_IF_MISSING,
+ }
+
+ public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws
IOException {
+ for (List<GcsPath> pathPartition :
+ Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+
+ // Create a new empty batch every time
+ StorageBatch batch = storage.batch();
+ List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();
+
+ for (GcsPath path : pathPartition) {
+ batchResultFutures.add(batch.delete(path.getBucket(),
path.getObject()));
+ }
+ batch.submit();
+
+ for (int i = 0; i < batchResultFutures.size(); i++) {
+ StorageBatchResult<Boolean> future = batchResultFutures.get(i);
+ try {
+ Boolean deleted = future.get();
+ if (!deleted) {
+ if (strategy == MissingStrategy.FAIL_IF_MISSING) {
+ throw new FileNotFoundException(
+ String.format(
+ "The specified file does not exist: %s",
pathPartition.get(i).toString()));
+ } else {
+ LOG.warn("Ignoring failed deletion on file {}.",
pathPartition.get(i).toString());
+ }
+ }
+ } catch (StorageException e) {
+ throw translateStorageException(pathPartition.get(i), e);
+ }
+ }
+ }
+ }
+
+ public enum OverwriteStrategy {
+ FAIL_IF_EXISTS, // Fail if target exists
+ SKIP_IF_EXISTS, // Skip if target exists
+ SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
+ ALWAYS_OVERWRITE // Overwrite regardless of state
+ }
+
+ private void rewriteHelper(
+ Iterable<GcsPath> srcPaths,
+ Iterable<GcsPath> dstPaths,
+ boolean deleteSrc,
+ MissingStrategy srcMissing,
+ OverwriteStrategy dstOverwrite)
+ throws IOException {
+ List<GcsPath> srcList = Lists.newArrayList(srcPaths);
+ List<GcsPath> dstList = Lists.newArrayList(dstPaths);
+ checkArgument(
+ srcList.size() == dstList.size(),
+ "Number of source files %s must equal number of destination files %s",
+ srcList.size(),
+ dstList.size());
+
+ for (int i = 0; i < srcList.size(); i++) {
+ GcsPath srcPath = srcList.get(i);
+ GcsPath dstPath = dstList.get(i);
+ BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
+ BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());
+
+ CopyRequest.Builder copyRequestBuilder =
+ CopyRequest.newBuilder()
+ .setSource(srcId)
+ .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);
+
+ if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
+ copyRequestBuilder.setTarget(dstId);
+ } else {
+ // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking
the target blob
+ BlobInfo existingTarget;
+ try {
+ existingTarget = storage.get(dstId);
+ } catch (StorageException e) {
+ throw translateStorageException(dstPath, e);
+ }
+
+ if (existingTarget == null) {
+ copyRequestBuilder.setTarget(dstId,
Storage.BlobTargetOption.doesNotExist());
+ } else {
+ switch (dstOverwrite) {
+ case SKIP_IF_EXISTS:
+ LOG.warn("Ignoring rewriting from {} to {} because target
exists.", srcPath, dstPath);
+ continue; // Skip to next file in for-loop
+
+ case SAFE_OVERWRITE:
+ copyRequestBuilder.setTarget(
+ dstId,
Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
+ break;
+
+ case FAIL_IF_EXISTS:
+ throw new FileAlreadyExistsException(
+ srcPath.toString(),
+ dstPath.toString(),
+ "Target object already exists and strategy is
FAIL_IF_EXISTS");
+ default:
+ throw new IllegalStateException("Unknown OverwriteStrategy: " +
dstOverwrite);
+ }
+ }
+ }
+
+ try {
+ CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
+ copyWriter.getResult();
+
+ if (deleteSrc) {
+ if (!storage.delete(srcId)) {
+ // This may happen if the source file is deleted by another
process after copy.
+ LOG.warn(
+ "Source file {} could not be deleted after move to {}. It may
not have existed.",
+ srcPath,
+ dstPath);
+ }
+ }
+ } catch (StorageException e) {
+ if (e.getCode() == 404 && srcMissing ==
MissingStrategy.SKIP_IF_MISSING) {
+ LOG.warn(
+ "Ignoring rewriting from {} to {} because source does not
exist.", srcPath, dstPath);
+ continue;
+ }
+ throw translateStorageException(srcPath, e);
+ }
+ }
+ }
+
+ public void copy(
+ Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths,
OverwriteStrategy strategy)
+ throws IOException {
+ rewriteHelper(srcPaths, dstPaths, false, MissingStrategy.FAIL_IF_MISSING,
strategy);
+ }
+
+ public void move(
+ Iterable<GcsPath> srcPaths,
+ Iterable<GcsPath> dstPaths,
+ MissingStrategy srcMissing,
+ OverwriteStrategy dstOverwrite)
+ throws IOException {
+ rewriteHelper(srcPaths, dstPaths, true, srcMissing, dstOverwrite);
+ }
+
/** Get the {@link Bucket} from Cloud Storage path or propagates an
exception. */
public Bucket getBucket(GcsPath path, BucketGetOption... options) throws
IOException {
String bucketName = path.getBucket();
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
index db5097c9515..80ffd72924f 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
@@ -37,7 +37,10 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -297,4 +300,291 @@ public class GcsUtilParameterizedIT {
}
}
}
+
+ private List<GcsPath> createTestBucketHelper(String bucketName) throws
IOException {
+ final List<GcsPath> originPaths =
+ Arrays.asList(
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt"));
+
+ final List<GcsPath> testPaths =
+ originPaths.stream()
+ .map(o -> GcsPath.fromComponents(bucketName, o.getObject()))
+ .collect(Collectors.toList());
+
+ // create bucket and copy some initial files into there
+ if (experiment.equals("use_gcsutil_v2")) {
+ gcsUtil.createBucket(BucketInfo.of(bucketName));
+
+ gcsUtil.copyV2(originPaths, testPaths);
+ } else {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ gcsUtil.createBucket(gcsOptions.getProject(), new
Bucket().setName(bucketName));
+
+ final List<String> originList =
+ originPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> testList =
+ testPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ gcsUtil.copy(originList, testList);
+ }
+
+ return testPaths;
+ }
+
+ private void tearDownTestBucketHelper(String bucketName) {
+ try {
+ // use "**" in the pattern to match any characters including "/".
+ final List<GcsPath> paths =
+ gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**",
bucketName)));
+ if (experiment.equals("use_gcsutil_v2")) {
+ gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING);
+ gcsUtil.removeBucket(BucketInfo.of(bucketName));
+ } else {
+
gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList()));
+ gcsUtil.removeBucket(new Bucket().setName(bucketName));
+ }
+ } catch (IOException e) {
+ System.err.println(
+ "Error during tear down of test bucket " + bucketName + ": " +
e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCopy() throws IOException {
+ final String existingBucket = "apache-beam-temp-bucket-12345";
+ final String nonExistentBucket = "my-random-test-bucket-12345";
+
+ try {
+ final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+ final List<GcsPath> dstPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() +
".bak"))
+ .collect(Collectors.toList());
+ final List<GcsPath> errPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(nonExistentBucket,
o.getObject()))
+ .collect(Collectors.toList());
+
+ assertNotExists(dstPaths.get(0));
+ assertNotExists(dstPaths.get(1));
+
+ if (experiment.equals("use_gcsutil_v2")) {
+ // (1) when the target files do not exist
+ gcsUtil.copyV2(srcPaths, dstPaths);
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+
+ // (2) when the target files exist
+ // (2a) no exception on SAFE_OVERWRITE, ALWAYS_OVERWRITE,
SKIP_IF_EXISTS
+ gcsUtil.copyV2(srcPaths, dstPaths);
+ gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.ALWAYS_OVERWRITE);
+ gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SKIP_IF_EXISTS);
+
+ // (2b) raise exception on FAIL_IF_EXISTS
+ assertThrows(
+ FileAlreadyExistsException.class,
+ () -> gcsUtil.copy(srcPaths, dstPaths,
OverwriteStrategy.FAIL_IF_EXISTS));
+
+ // (3) raise exception when the target bucket is nonexistent.
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.copyV2(srcPaths, errPaths));
+
+ // (4) raise exception when the source files are nonexistent.
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.copyV2(errPaths, dstPaths));
+ } else {
+ final List<String> srcList =
+ srcPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> dstList =
+ dstPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> errList =
+ errPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+
+ // (1) when the target files do not exist
+ gcsUtil.copy(srcList, dstList);
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+
+ // (2) when the target files exist, no exception
+ gcsUtil.copy(srcList, dstList);
+
+ // (3) raise exception when the target bucket is nonexistent.
+ assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(srcList,
errList));
+
+ // (4) raise exception when the source files are nonexistent.
+ assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(errList,
dstList));
+ }
+ } finally {
+ tearDownTestBucketHelper(existingBucket);
+ }
+ }
+
+ @Test
+ public void testRemove() throws IOException {
+ final String existingBucket = "apache-beam-temp-bucket-12345";
+ final String nonExistentBucket = "my-random-test-bucket-12345";
+
+ try {
+ final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+ final List<GcsPath> errPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(nonExistentBucket,
o.getObject()))
+ .collect(Collectors.toList());
+
+ assertExists(srcPaths.get(0));
+ assertExists(srcPaths.get(1));
+
+ if (experiment.equals("use_gcsutil_v2")) {
+ // (1) when the files to remove exist
+ gcsUtil.removeV2(srcPaths);
+ assertNotExists(srcPaths.get(0));
+ assertNotExists(srcPaths.get(1));
+
+ // (2) when the files to remove have been deleted
+ // (2a) no exception on SKIP_IF_MISSING
+ gcsUtil.removeV2(srcPaths);
+ gcsUtil.remove(srcPaths, MissingStrategy.SKIP_IF_MISSING);
+
+ // (2b) raise exception on FAIL_IF_MISSING
+ assertThrows(
+ FileNotFoundException.class,
+ () -> gcsUtil.remove(srcPaths, MissingStrategy.FAIL_IF_MISSING));
+
+ // (3) when the files are from an nonexistent bucket
+ // (3a) no exception on SKIP_IF_MISSING
+ gcsUtil.removeV2(errPaths);
+ gcsUtil.remove(errPaths, MissingStrategy.SKIP_IF_MISSING);
+
+ // (3b) raise exception on FAIL_IF_MISSING
+ assertThrows(
+ FileNotFoundException.class,
+ () -> gcsUtil.remove(errPaths, MissingStrategy.FAIL_IF_MISSING));
+ } else {
+ final List<String> srcList =
+ srcPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> errList =
+ errPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+
+ // (1) when the files to remove exist
+ gcsUtil.remove(srcList);
+ assertNotExists(srcPaths.get(0));
+ assertNotExists(srcPaths.get(1));
+
+ // (2) when the files to remove have been deleted, no exception
+ gcsUtil.remove(srcList);
+
+ // (3) when the files are from an nonexistent bucket, no exception
+ gcsUtil.remove(errList);
+ }
+ } finally {
+ tearDownTestBucketHelper(existingBucket);
+ }
+ }
+
+ @Test
+ public void testRename() throws IOException {
+ final String existingBucket = "apache-beam-temp-bucket-12345";
+ final String nonExistentBucket = "my-random-test-bucket-12345";
+
+ try {
+ final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+ final List<GcsPath> tmpPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" +
o.getObject()))
+ .collect(Collectors.toList());
+ final List<GcsPath> dstPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() +
".bak"))
+ .collect(Collectors.toList());
+ final List<GcsPath> errPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(nonExistentBucket,
o.getObject()))
+ .collect(Collectors.toList());
+
+ assertNotExists(dstPaths.get(0));
+ assertNotExists(dstPaths.get(1));
+ if (experiment.equals("use_gcsutil_v2")) {
+ // Make a copy of sources
+ gcsUtil.copyV2(srcPaths, tmpPaths);
+
+ // (1) when the source files exist and target files do not
+ gcsUtil.renameV2(tmpPaths, dstPaths);
+ assertNotExists(tmpPaths.get(0));
+ assertNotExists(tmpPaths.get(1));
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+
+ // (2) when the source files do not exist
+ // (2a) no exception if IGNORE_MISSING_FILES is set
+ gcsUtil.renameV2(errPaths, dstPaths,
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+ // (2b) raise exception if if IGNORE_MISSING_FILES is not set
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.renameV2(errPaths, dstPaths));
+
+ // (3) when both source files and target files exist
+ gcsUtil.renameV2(
+ srcPaths, dstPaths,
MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+ gcsUtil.renameV2(srcPaths, dstPaths);
+ } else {
+ final List<String> srcList =
+ srcPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> tmpList =
+ tmpPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> dstList =
+ dstPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> errList =
+ errPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+
+ // Make a copy of sources
+ gcsUtil.copy(srcList, tmpList);
+
+ // (1) when the source files exist and target files do not
+ gcsUtil.rename(tmpList, dstList);
+ assertNotExists(tmpPaths.get(0));
+ assertNotExists(tmpPaths.get(1));
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+
+ // (2) when the source files do not exist
+ // (2a) no exception if IGNORE_MISSING_FILES is set
+ gcsUtil.rename(errList, dstList,
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+
+ // (2b) raise exception if if IGNORE_MISSING_FILES is not set
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.rename(errList, dstList));
+
+ // (3) when both source files and target files exist
+ assertExists(srcPaths.get(0));
+ assertExists(srcPaths.get(1));
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+
+ // There is a bug in V1 where SKIP_IF_DESTINATION_EXISTS is not
honored.
+ gcsUtil.rename(
+ srcList, dstList,
MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+
+ assertNotExists(srcPaths.get(0)); // BUG! The renaming is supposed to
be skipped
+ assertNotExists(srcPaths.get(1)); // BUG! The renaming is supposed to
be skipped
+ // assertExists(srcPaths.get(0));
+ // assertExists(srcPaths.get(1));
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+ }
+ } finally {
+ tearDownTestBucketHelper(existingBucket);
+ }
+ }
+
+ private void assertExists(GcsPath path) throws IOException {
+ if (experiment.equals("use_gcsutil_v2")) {
+ gcsUtil.getBlob(path);
+ } else {
+ gcsUtil.getObject(path);
+ }
+ }
+
+ private void assertNotExists(GcsPath path) throws IOException {
+ if (experiment.equals("use_gcsutil_v2")) {
+ assertThrows(FileNotFoundException.class, () -> gcsUtil.getBlob(path));
+ } else {
+ assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
+ }
+ }
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
index f344c6c2dba..9512fec312c 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java
@@ -351,6 +351,7 @@ public class GcsPathTest {
@Test
public void testIsWildcard() {
+ assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/*")));
assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo*")));
assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo?")));
assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo[a-z]")));
@@ -359,6 +360,7 @@ public class GcsPathTest {
@Test
public void testGetNonWildcardPrefix() {
+ assertEquals("gs://bucket/",
GcsPath.getNonWildcardPrefix("gs://bucket/*"));
assertEquals("gs://bucket/foo",
GcsPath.getNonWildcardPrefix("gs://bucket/foo*"));
assertEquals("gs://bucket/foo",
GcsPath.getNonWildcardPrefix("gs://bucket/foo?"));
assertEquals("gs://bucket/foo",
GcsPath.getNonWildcardPrefix("gs://bucket/foo[a-z]"));