gemini-code-assist[bot] commented on code in PR #37592:
URL: https://github.com/apache/beam/pull/37592#discussion_r2819175993
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -297,4 +300,295 @@ public void testCreateAndRemoveBucket() throws
IOException {
}
}
}
+
+ private List<GcsPath> createTestBucketHelper(String bucketName) throws
IOException {
+ final List<GcsPath> originPaths =
+ Arrays.asList(
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt"));
+
+ final List<GcsPath> testPaths =
+ originPaths.stream()
+ .map(o -> GcsPath.fromComponents(bucketName, o.getObject()))
+ .collect(Collectors.toList());
+
+ // create bucket and copy some initial files into there
+ if (experiment.equals("use_gcsutil_v2")) {
+ gcsUtil.createBucket(BucketInfo.of(bucketName));
+
+ gcsUtil.copyV2(originPaths, testPaths);
+ } else {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ gcsUtil.createBucket(gcsOptions.getProject(), new
Bucket().setName(bucketName));
+
+ final List<String> originList =
+ originPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> testList =
+ testPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ gcsUtil.copy(originList, testList);
+ }
+
+ return testPaths;
+ }
+
+ private void tearDownTestBucketHelper(String bucketName) {
+ try {
+ // use "**" in the pattern to match any characters including "/".
+ final List<GcsPath> paths =
+ gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**",
bucketName)));
+ if (experiment.equals("use_gcsutil_v2")) {
+ gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING);
+ gcsUtil.removeBucket(BucketInfo.of(bucketName));
+ } else {
+
gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList()));
+ gcsUtil.removeBucket(new Bucket().setName(bucketName));
+ }
+ } catch (IOException e) {
+ }
Review Comment:

Swallowing exceptions, even in test teardown, can hide issues and make
debugging harder. It's better to at least log the exception. Consider adding a
SLF4J logger to this class and logging the exception at a `WARN` level.
```java
} catch (IOException e) {
System.err.println(
"Error during tear down of test bucket " + bucketName + ": " +
e.getMessage());
}
```
##########
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java:
##########
@@ -297,4 +300,295 @@ public void testCreateAndRemoveBucket() throws
IOException {
}
}
}
+
+ private List<GcsPath> createTestBucketHelper(String bucketName) throws
IOException {
+ final List<GcsPath> originPaths =
+ Arrays.asList(
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
+
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt"));
+
+ final List<GcsPath> testPaths =
+ originPaths.stream()
+ .map(o -> GcsPath.fromComponents(bucketName, o.getObject()))
+ .collect(Collectors.toList());
+
+ // create bucket and copy some initial files into there
+ if (experiment.equals("use_gcsutil_v2")) {
+ gcsUtil.createBucket(BucketInfo.of(bucketName));
+
+ gcsUtil.copyV2(originPaths, testPaths);
+ } else {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ gcsUtil.createBucket(gcsOptions.getProject(), new
Bucket().setName(bucketName));
+
+ final List<String> originList =
+ originPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> testList =
+ testPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ gcsUtil.copy(originList, testList);
+ }
+
+ return testPaths;
+ }
+
+ private void tearDownTestBucketHelper(String bucketName) {
+ try {
+ // use "**" in the pattern to match any characters including "/".
+ final List<GcsPath> paths =
+ gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**",
bucketName)));
+ if (experiment.equals("use_gcsutil_v2")) {
+ gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING);
+ gcsUtil.removeBucket(BucketInfo.of(bucketName));
+ } else {
+
gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList()));
+ gcsUtil.removeBucket(new Bucket().setName(bucketName));
+ }
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ public void testCopy() throws IOException {
+ final String existingBucket = "apache-beam-temp-bucket-12345";
+ final String nonExistentBucket = "my-random-test-bucket-12345";
+
+ try {
+ final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+ final List<GcsPath> dstPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() +
".bak"))
+ .collect(Collectors.toList());
+ final List<GcsPath> errPaths =
+ srcPaths.stream()
+ .map(o -> GcsPath.fromComponents(nonExistentBucket,
o.getObject()))
+ .collect(Collectors.toList());
+
+ assertNotExists(dstPaths.get(0));
+ assertNotExists(dstPaths.get(1));
+
+ if (experiment.equals("use_gcsutil_v2")) {
+ // (1) when the target files do not exist
+ gcsUtil.copyV2(srcPaths, dstPaths);
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+
+ // (2) when the target files exist
+ // (2a) no exception on SAFE_OVERWRITE, ALWAYS_OVERWRITE,
SKIP_IF_EXISTS
+ gcsUtil.copyV2(srcPaths, dstPaths);
+ gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.ALWAYS_OVERWRITE);
+ gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SKIP_IF_EXISTS);
+
+ // (2b) raise exception on FAIL_IF_EXISTS
+ assertThrows(
+ FileAlreadyExistsException.class,
+ () -> gcsUtil.copy(srcPaths, dstPaths,
OverwriteStrategy.FAIL_IF_EXISTS));
+
+ // (3) raise exception when the target bucket is nonexistent.
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.copyV2(srcPaths, errPaths));
+
+ // (4) raise exception when the source files are nonexistent.
+ assertThrows(FileNotFoundException.class, () ->
gcsUtil.copyV2(errPaths, dstPaths));
+ } else {
+ final List<String> srcList =
+ srcPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> dstList =
+ dstPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+ final List<String> errList =
+ errPaths.stream().map(o ->
o.toString()).collect(Collectors.toList());
+
+ // (1) when the target files do not exist
+ gcsUtil.copy(srcList, dstList);
+ assertExists(dstPaths.get(0));
+ assertExists(dstPaths.get(1));
+
+ // (2) when the target files exist, no exception
+ gcsUtil.copy(srcList, dstList);
+
+ // (3) raise exception when the target bucket is nonexistent.
+ assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(srcList,
errList));
+
+ // (4) raise exception when the source files are nonexistent.
+ assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(errList,
dstList));
+ }
+ } catch (IOException e) {
+ throw e;
Review Comment:

This `catch` block is redundant as it just re-throws the caught exception.
It can be removed to simplify the code. This applies to `testRemove` and
`testRename` as well.
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -259,6 +273,144 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws
IOException {
return results;
}
+ public enum MissingStrategy {
+ FAIL_IF_MISSING,
+ SKIP_IF_MISSING,
+ }
+
+ public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws
IOException {
+ for (List<GcsPath> pathPartition :
+ Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+
+ // Create a new empty batch every time
+ StorageBatch batch = storage.batch();
+ List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();
+
+ for (GcsPath path : pathPartition) {
+ batchResultFutures.add(batch.delete(path.getBucket(),
path.getObject()));
+ }
+ batch.submit();
+
+ for (int i = 0; i < batchResultFutures.size(); i++) {
+ StorageBatchResult<Boolean> future = batchResultFutures.get(i);
+ try {
+ Boolean deleted = future.get();
+ if (!deleted) {
+ if (strategy == MissingStrategy.FAIL_IF_MISSING) {
+ throw new FileNotFoundException(
+ String.format(
+ "The specified file does not exist: %s",
pathPartition.get(i).toString()));
+ } else {
+ LOG.warn("Ignoring failed deletion on file {}.",
pathPartition.get(i).toString());
+ }
+ }
+ } catch (StorageException e) {
+ throw translateStorageException(pathPartition.get(i), e);
+ }
+ }
+ }
+ }
+
+ public enum OverwriteStrategy {
+ FAIL_IF_EXISTS, // Fail if target exists
+ SKIP_IF_EXISTS, // Skip if target exists
+ SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
+ ALWAYS_OVERWRITE // Overwrite regardless of state
+ }
+
+ private void rewriteHelper(
+ Iterable<GcsPath> srcPaths,
+ Iterable<GcsPath> dstPaths,
+ boolean deleteSrc,
+ MissingStrategy srcMissing,
+ OverwriteStrategy dstOverwrite)
+ throws IOException {
+ List<GcsPath> srcList = Lists.newArrayList(srcPaths);
+ List<GcsPath> dstList = Lists.newArrayList(dstPaths);
+ checkArgument(
+ srcList.size() == dstList.size(),
+ "Number of source files %s must equal number of destination files %s",
+ srcList.size(),
+ dstList.size());
+
+ for (int i = 0; i < srcList.size(); i++) {
+ GcsPath srcPath = srcList.get(i);
+ GcsPath dstPath = dstList.get(i);
+ BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
+ BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());
+
+ CopyRequest.Builder copyRequestBuilder =
+ CopyRequest.newBuilder()
+ .setSource(srcId)
+ .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);
+
+ if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
+ copyRequestBuilder.setTarget(dstId);
+ } else {
+ // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking
the target blob
+ BlobInfo existingTarget;
+ try {
+ existingTarget = storage.get(dstId);
+ } catch (StorageException e) {
+ throw translateStorageException(dstPath, e);
+ }
+
+ if (existingTarget == null) {
+ copyRequestBuilder.setTarget(dstId,
Storage.BlobTargetOption.doesNotExist());
+ } else {
+ switch (dstOverwrite) {
+ case SKIP_IF_EXISTS:
+ LOG.warn("Ignoring rewriting from {} to {} because target
exists.", srcPath, dstPath);
+ continue; // Skip to next file in for-loop
+
+ case SAFE_OVERWRITE:
+ copyRequestBuilder.setTarget(
+ dstId,
Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
+ break;
+
+ case FAIL_IF_EXISTS:
+ default:
+ throw new FileAlreadyExistsException(
+ srcPath.toString(),
+ dstPath.toString(),
+ "Target object already exists and strategy is
FAIL_IF_EXISTS");
+ }
+ }
+ }
+
+ try {
+ CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
+ copyWriter.getResult();
+
+ if (deleteSrc) {
+ storage.get(srcId).delete();
Review Comment:

Calling `storage.get(srcId).delete()` performs an unnecessary `GET` request
before the `DELETE`. More importantly, if `get(srcId)` returns `null` (e.g.,
due to a race condition where the file is deleted after copy but before this
line), it will cause a `NullPointerException`. It's safer and more efficient to
use `storage.delete(srcId)` directly and handle its boolean return value.
```java
if (!storage.delete(srcId)) {
// This may happen if the source file is deleted by another
process after copy.
LOG.warn(
"Source file {} could not be deleted after move to {}. It
may not have existed.",
srcPath,
dstPath);
}
```
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -433,12 +438,69 @@ public void copy(Iterable<String> srcFilenames,
Iterable<String> destFilenames)
delegate.copy(srcFilenames, destFilenames);
}
+ /** experimental api. */
+ public void copyV2(Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths)
throws IOException {
+ copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE);
+ }
+
+ /** experimental api. */
+ public void copy(
+ Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths,
OverwriteStrategy strategy)
+ throws IOException {
+ if (delegateV2 != null) {
+ delegateV2.copy(srcPaths, dstPaths, strategy);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
+ }
+
public void rename(
Iterable<String> srcFilenames, Iterable<String> destFilenames,
MoveOptions... moveOptions)
throws IOException {
delegate.rename(srcFilenames, destFilenames, moveOptions);
}
+ /** experimental api. */
+ public void renameV2(
+ Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, MoveOptions...
moveOptions)
+ throws IOException {
+ Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
+ final MissingStrategy srcMissing;
+ final OverwriteStrategy dstOverwrite;
+
+ if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) {
+ srcMissing = MissingStrategy.SKIP_IF_MISSING;
+ } else {
+ srcMissing = MissingStrategy.FAIL_IF_MISSING;
+ }
+
+ if
(moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) {
+ dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS;
+ } else {
+ dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE;
+ }
+
+ if (delegateV2 != null) {
+ delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite);
+ } else {
+ throw new IOException("GcsUtil V2 not initialized.");
+ }
Review Comment:

There's some code duplication between this method (`renameV2`) and the
`rename` method overload that takes `MissingStrategy` and `OverwriteStrategy`.
You can avoid this by having `renameV2` call the other `rename` method after
determining the strategies.
```java
rename(srcPaths, dstPaths, srcMissing, dstOverwrite);
```
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -259,6 +273,144 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws
IOException {
return results;
}
+ public enum MissingStrategy {
+ FAIL_IF_MISSING,
+ SKIP_IF_MISSING,
+ }
+
+ public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws
IOException {
+ for (List<GcsPath> pathPartition :
+ Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
+
+ // Create a new empty batch every time
+ StorageBatch batch = storage.batch();
+ List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();
+
+ for (GcsPath path : pathPartition) {
+ batchResultFutures.add(batch.delete(path.getBucket(),
path.getObject()));
+ }
+ batch.submit();
+
+ for (int i = 0; i < batchResultFutures.size(); i++) {
+ StorageBatchResult<Boolean> future = batchResultFutures.get(i);
+ try {
+ Boolean deleted = future.get();
+ if (!deleted) {
+ if (strategy == MissingStrategy.FAIL_IF_MISSING) {
+ throw new FileNotFoundException(
+ String.format(
+ "The specified file does not exist: %s",
pathPartition.get(i).toString()));
+ } else {
+ LOG.warn("Ignoring failed deletion on file {}.",
pathPartition.get(i).toString());
+ }
+ }
+ } catch (StorageException e) {
+ throw translateStorageException(pathPartition.get(i), e);
+ }
+ }
+ }
+ }
+
+ public enum OverwriteStrategy {
+ FAIL_IF_EXISTS, // Fail if target exists
+ SKIP_IF_EXISTS, // Skip if target exists
+ SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
+ ALWAYS_OVERWRITE // Overwrite regardless of state
+ }
+
+ private void rewriteHelper(
+ Iterable<GcsPath> srcPaths,
+ Iterable<GcsPath> dstPaths,
+ boolean deleteSrc,
+ MissingStrategy srcMissing,
+ OverwriteStrategy dstOverwrite)
+ throws IOException {
+ List<GcsPath> srcList = Lists.newArrayList(srcPaths);
+ List<GcsPath> dstList = Lists.newArrayList(dstPaths);
+ checkArgument(
+ srcList.size() == dstList.size(),
+ "Number of source files %s must equal number of destination files %s",
+ srcList.size(),
+ dstList.size());
+
+ for (int i = 0; i < srcList.size(); i++) {
+ GcsPath srcPath = srcList.get(i);
+ GcsPath dstPath = dstList.get(i);
+ BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
+ BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());
+
+ CopyRequest.Builder copyRequestBuilder =
+ CopyRequest.newBuilder()
+ .setSource(srcId)
+ .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);
+
+ if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
+ copyRequestBuilder.setTarget(dstId);
+ } else {
+ // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking
the target blob
+ BlobInfo existingTarget;
+ try {
+ existingTarget = storage.get(dstId);
+ } catch (StorageException e) {
+ throw translateStorageException(dstPath, e);
+ }
+
+ if (existingTarget == null) {
+ copyRequestBuilder.setTarget(dstId,
Storage.BlobTargetOption.doesNotExist());
+ } else {
+ switch (dstOverwrite) {
+ case SKIP_IF_EXISTS:
+ LOG.warn("Ignoring rewriting from {} to {} because target
exists.", srcPath, dstPath);
+ continue; // Skip to next file in for-loop
+
+ case SAFE_OVERWRITE:
+ copyRequestBuilder.setTarget(
+ dstId,
Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
+ break;
+
+ case FAIL_IF_EXISTS:
+ default:
+ throw new FileAlreadyExistsException(
+ srcPath.toString(),
+ dstPath.toString(),
+ "Target object already exists and strategy is
FAIL_IF_EXISTS");
+ }
Review Comment:

The `default` case here handles `FAIL_IF_EXISTS`. For better maintainability
and to guard against future unhandled `OverwriteStrategy` enum values, it's
clearer to have an explicit `case FAIL_IF_EXISTS:` and a `default:` case that
throws an `IllegalStateException`.
```java
case FAIL_IF_EXISTS:
throw new FileAlreadyExistsException(
srcPath.toString(),
dstPath.toString(),
"Target object already exists and strategy is
FAIL_IF_EXISTS");
default:
throw new IllegalStateException("Unknown OverwriteStrategy: "
+ dstOverwrite);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]