This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new aa574f5e6e7 Don't use batch interface for single object operations (#22432) aa574f5e6e7 is described below commit aa574f5e6e78b311744793701e9f6b6abf04d5a9 Author: Steven Niemitz <steveniem...@gmail.com> AuthorDate: Wed Aug 17 18:39:42 2022 -0400 Don't use batch interface for single object operations (#22432) --- .../beam/sdk/extensions/gcp/util/GcsUtil.java | 18 ++++- .../beam/sdk/extensions/gcp/util/GcsUtilTest.java | 86 +++++++++++++++++++++- 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index c5d92a1a35c..2acec4f2386 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -361,6 +361,22 @@ public class GcsUtil { * GcsPath GcsPaths}. */ public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) throws IOException { + if (gcsPaths.isEmpty()) { + return ImmutableList.of(); + } else if (gcsPaths.size() == 1) { + GcsPath path = gcsPaths.get(0); + try { + StorageObject object = getObject(path); + return ImmutableList.of(StorageObjectOrIOException.create(object)); + } catch (IOException e) { + return ImmutableList.of(StorageObjectOrIOException.create(e)); + } catch (Exception e) { + IOException ioException = + new IOException(String.format("Error trying to get %s: %s", path, e)); + return ImmutableList.of(StorageObjectOrIOException.create(ioException)); + } + } + List<StorageObjectOrIOException[]> results = new ArrayList<>(); executeBatches(makeGetBatches(gcsPaths, results)); ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder(); @@ -749,7 +765,7 @@ public class GcsUtil { List<CompletionStage<Void>> futures = new ArrayList<>(); for (final BatchInterface batch : batches) { - futures.add(MoreFutures.runAsync(() -> batch.execute(), executor)); + futures.add(MoreFutures.runAsync(batch::execute, executor)); } try { diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index 8c02d219858..33a87c6d0ee 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -476,6 +476,16 @@ public class GcsUtilTest { String content = contentBoundaryLine + + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: -1\n" + + "\n" + + error.toString() + + "\n" + + "\n" + + contentBoundaryLine + "\n" + "Content-Type: application/http\n" + "\n" @@ -499,6 +509,27 @@ public class GcsUtilTest { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.fileSizes( + ImmutableList.of( + GcsPath.fromComponents("testbucket", "testobject"), + GcsPath.fromComponents("testbucket", "testobject2"))); + } + + @Test + public void testGetSizeBytesWhenFileNotFoundNoBatch() throws Exception { + thrown.expect(FileNotFoundException.class); + MockLowLevelHttpResponse notFoundResponse = + new MockLowLevelHttpResponse() + .setContentType("text/plain") + .setContent("error") + .setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND); + + MockHttpTransport mockTransport = + new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); } @@ -525,8 +556,18 @@ public class GcsUtilTest { + error.toString() + "\n" + "\n" - + endOfContentBoundaryLine - + "\n"; + + contentBoundaryLine + + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: -1\n" + + "\n" + + error.toString() + + "\n" + + "\n" + + endOfContentBoundaryLine; + thrown.expect(FileNotFoundException.class); final LowLevelHttpResponse[] mockResponses = @@ -559,6 +600,47 @@ public class GcsUtilTest { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + gcsUtil.setStorageClient( + new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); + gcsUtil.fileSizes( + ImmutableList.of( + GcsPath.fromComponents("testbucket", "testobject"), + GcsPath.fromComponents("testbucket", "testobject2"))); + } + + @Test + public void testGetSizeBytesWhenFileNotFoundNoBatchRetry() throws Exception { + thrown.expect(FileNotFoundException.class); + + final LowLevelHttpResponse[] mockResponses = + new LowLevelHttpResponse[] { + Mockito.mock(LowLevelHttpResponse.class), Mockito.mock(LowLevelHttpResponse.class), + }; + when(mockResponses[0].getContentType()).thenReturn("text/plain"); + when(mockResponses[1].getContentType()).thenReturn("text/plain"); + + // 429: Too many requests, then 200: OK. + when(mockResponses[0].getStatusCode()).thenReturn(429); + when(mockResponses[1].getStatusCode()).thenReturn(404); + when(mockResponses[0].getContent()).thenReturn(toStream("error")); + when(mockResponses[1].getContent()).thenReturn(toStream("error")); + + // A mock transport that lets us mock the API responses. + MockHttpTransport mockTransport = + new MockHttpTransport.Builder() + .setLowLevelHttpRequest( + new MockLowLevelHttpRequest() { + int index = 0; + + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponses[index++]; + } + }) + .build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + gcsUtil.setStorageClient( new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));