Repository: beam Updated Branches: refs/heads/master c53249de4 -> faa2277b5
GcsUtil: set timeout and retry for BatchRequest with HttpRequestInitializer. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97a76d94 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97a76d94 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97a76d94 Branch: refs/heads/master Commit: 97a76d941776300ad3f77017869835327776f62e Parents: c53249d Author: Pei He <[email protected]> Authored: Tue Dec 13 17:16:12 2016 -0800 Committer: Dan Halperin <[email protected]> Committed: Wed Jan 18 12:26:09 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/util/GcsUtil.java | 20 +++++-- .../org/apache/beam/sdk/util/GcsUtilTest.java | 56 ++++++++++++++++++++ 2 files changed, 71 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/97a76d94/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index dcdba46..521673c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -25,6 +25,7 @@ import com.google.api.client.googleapis.batch.json.JsonBatchCallback; import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; @@ -93,8 +94,10 @@ public class GcsUtil { public GcsUtil create(PipelineOptions options) { LOG.debug("Creating new GcsUtil"); GcsOptions gcsOptions = options.as(GcsOptions.class); + Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); return new GcsUtil( - Transport.newStorageClient(gcsOptions).build(), + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes()); } @@ -132,6 +135,7 @@ public class GcsUtil { /** Client for the GCS API. */ private Storage storageClient; + private final HttpRequestInitializer httpRequestInitializer; /** Buffer size for GCS uploads (in bytes). */ @Nullable private final Integer uploadBufferSizeBytes; @@ -156,9 +160,11 @@ public class GcsUtil { private GcsUtil( Storage storageClient, + HttpRequestInitializer httpRequestInitializer, ExecutorService executorService, @Nullable Integer uploadBufferSizeBytes) { this.storageClient = storageClient; + this.httpRequestInitializer = httpRequestInitializer; this.uploadBufferSizeBytes = uploadBufferSizeBytes; this.executorService = executorService; } @@ -526,7 +532,7 @@ public class GcsUtil { List<BatchRequest> batches = new LinkedList<>(); for (List<GcsPath> filesToGet : Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = storageClient.batch(); + BatchRequest batch = createBatchRequest(); for (GcsPath path : filesToGet) { results.add(enqueueGetFileSize(path, batch)); } @@ -548,14 +554,14 @@ public class GcsUtil { destFilenames.size()); List<BatchRequest> batches = new LinkedList<>(); - BatchRequest batch = storageClient.batch(); + BatchRequest batch = createBatchRequest(); for (int i = 0; i < srcFilenames.size(); i++) { final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); enqueueCopy(sourcePath, destPath, batch); if (batch.size() >= MAX_REQUESTS_PER_BATCH) { batches.add(batch); - batch = storageClient.batch(); + batch = createBatchRequest(); } } if (batch.size() > 0) { @@ -568,7 +574,7 @@ public class GcsUtil { List<BatchRequest> batches = new LinkedList<>(); for (List<String> filesToDelete : Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = storageClient.batch(); + BatchRequest batch = createBatchRequest(); for (String file : filesToDelete) { enqueueDelete(GcsPath.fromUri(file), batch); } @@ -648,6 +654,10 @@ public class GcsUtil { }); } + private BatchRequest createBatchRequest() { + return storageClient.batch(httpRequestInitializer); + } + /** * Expands glob expressions to regular expressions. * http://git-wip-us.apache.org/repos/asf/beam/blob/97a76d94/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 6ca87f9..d592761 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -38,6 +38,7 @@ import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpStatusCodes; import com.google.api.client.http.HttpTransport; import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; import com.google.api.client.json.GenericJson; import com.google.api.client.json.Json; import com.google.api.client.json.JsonFactory; @@ -55,11 +56,14 @@ import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; import com.google.cloud.hadoop.util.ClientRequestHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.math.BigInteger; import java.net.SocketTimeoutException; import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.Arrays; @@ -415,6 +419,51 @@ public class GcsUtilTest { } @Test + public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception { + JsonFactory jsonFactory = new JacksonFactory(); + + String contentBoundary = "batch_foobarbaz"; + + GenericJson error = new GenericJson() + .set("error", new GenericJson().set("code", 404)); + error.setFactory(jsonFactory); + + String content = contentBoundary + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: 105\n" + + "\n" + + error.toString(); + thrown.expect(FileNotFoundException.class); + + final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); + when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); + + // 429: Too many requests, then 200: OK. + when(mockResponse.getStatusCode()).thenReturn(429, 200); + when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content)); + + // A mock transport that lets us mock the API responses. + MockHttpTransport mockTransport = + new MockHttpTransport.Builder() + .setLowLevelHttpRequest( + new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponse; + } + }) + .build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + gcsUtil.setStorageClient( + new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); + gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test public void testCreateBucket() throws IOException { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); @@ -723,4 +772,11 @@ public class GcsUtilTest { assertThat(sumBatchSizes(batches), equalTo(501)); assertEquals(501, results.size()); } + + /** + * A helper to wrap a {@link GenericJson} object in a content stream. + */ + private static InputStream toStream(String content) throws IOException { + return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + } }
