scwhittle commented on a change in pull request #15353:
URL: https://github.com/apache/beam/pull/15353#discussion_r703363562
##########
File path:
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -871,6 +876,10 @@ private void rewriteHelper(
for (int i = 0; i < srcList.size(); i++) {
final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
final GcsPath destPath = GcsPath.fromUri(destList.get(i));
+ if (ignoreExistingDest &&
!sourcePath.getBucket().equals(destPath.getBucket())) {
Review comment:
This was relying on FileBasedSink using Filesystems.rename which handles
UnsupportedOperationExceptions in underlying filel system rename by falling
back to matching to filter files.
https://github.com/apache/beam/blob/a0fbe00ef12b72ec89672ab32ccc6d6331ca5edd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L310
##########
File path:
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
##########
@@ -1049,6 +1051,48 @@ public void testRenamePropagateMissingException() throws
IOException {
verify(mockStorageRewrite, times(1)).execute();
}
+ @Test
+ public void testRenameSkipDestinationExistsSameBucket() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
+ Storage.Objects.Delete mockStorageDelete =
Mockito.mock(Storage.Objects.Delete.class);
+
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
+ .thenReturn(mockStorageRewrite);
+ when(mockStorageRewrite.execute()).thenReturn(new
RewriteResponse().setDone(true));
+ when(mockStorageObjects.delete("bucket",
"s0")).thenReturn(mockStorageDelete);
+
+ gcsUtil.rename(
+ makeStrings("s", 1), makeStrings("d", 1),
StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+ verify(mockStorageRewrite, times(1)).execute();
+ verify(mockStorageDelete, times(1)).execute();
+ }
+
+ @Test
+ public void testRenameSkipDestinationExistsDifferentBucket() throws
IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ assertThrows(
+ UnsupportedOperationException.class,
Review comment:
See above on how this is handled by FileSystems.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]