chamikaramj commented on a change in pull request #15353:
URL: https://github.com/apache/beam/pull/15353#discussion_r701299769
##########
File path:
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -871,6 +876,10 @@ private void rewriteHelper(
for (int i = 0; i < srcList.size(); i++) {
final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
final GcsPath destPath = GcsPath.fromUri(destList.get(i));
+ if (ignoreExistingDest &&
!sourcePath.getBucket().equals(destPath.getBucket())) {
Review comment:
Unless we actually plumb this through to the user as a PipelineOption
this exception will not be actionable to the end user.
Won't this end up being a non-actionable hard regression for some users
since we always set this at FileBasedSink ?
https://github.com/apache/beam/blob/a0fbe00ef12b72ec89672ab32ccc6d6331ca5edd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L777
##########
File path:
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -793,7 +793,7 @@ public void onFailure(GoogleJsonError e, HttpHeaders
responseHeaders) throws IOE
public void copy(Iterable<String> srcFilenames, Iterable<String>
destFilenames)
throws IOException {
- rewriteHelper(srcFilenames, destFilenames, false, false);
+ rewriteHelper(srcFilenames, destFilenames, false, false, false);
Review comment:
Please add comments to clarify what parameters are (for readability).
##########
File path:
sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
##########
@@ -1049,6 +1051,48 @@ public void testRenamePropagateMissingException() throws
IOException {
verify(mockStorageRewrite, times(1)).execute();
}
+ @Test
+ public void testRenameSkipDestinationExistsSameBucket() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher());
+
+ Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+ Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
+ Storage.Objects.Delete mockStorageDelete =
Mockito.mock(Storage.Objects.Delete.class);
+
+ when(mockStorage.objects()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null))
+ .thenReturn(mockStorageRewrite);
+ when(mockStorageRewrite.execute()).thenReturn(new
RewriteResponse().setDone(true));
+ when(mockStorageObjects.delete("bucket",
"s0")).thenReturn(mockStorageDelete);
+
+ gcsUtil.rename(
+ makeStrings("s", 1), makeStrings("d", 1),
StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+ verify(mockStorageRewrite, times(1)).execute();
+ verify(mockStorageDelete, times(1)).execute();
+ }
+
+ @Test
+ public void testRenameSkipDestinationExistsDifferentBucket() throws
IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ assertThrows(
+ UnsupportedOperationException.class,
Review comment:
How can users perform such writes ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]