Repository: beam Updated Branches: refs/heads/master ed52d328d -> 6d3fe51ce
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java new file mode 100644 index 0000000..03668ce --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -0,0 +1,798 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import com.google.api.client.googleapis.batch.BatchRequest; +import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.GenericJson; +import com.google.api.client.json.Json; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.testing.http.HttpTesting; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.client.util.BackOff; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; +import com.google.cloud.hadoop.util.ClientRequestHelper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.SocketTimeoutException; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.AccessDeniedException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Test case for {@link GcsUtil}. */ +@RunWith(JUnit4.class) +public class GcsUtilTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testGlobTranslation() { + assertEquals("foo", GcsUtil.globToRegexp("foo")); + assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o")); + assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?")); + assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*")); + } + + private static GcsOptions gcsOptionsWithTestCredential() { + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + pipelineOptions.setGcpCredential(new TestCredential()); + return pipelineOptions; + } + + @Test + public void testCreationWithDefaultOptions() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + assertNotNull(pipelineOptions.getGcpCredential()); + } + + @Test + public void testUploadBufferSizeDefault() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil util = pipelineOptions.getGcsUtil(); + assertNull(util.getUploadBufferSizeBytes()); + } + + @Test + public void testUploadBufferSizeUserSpecified() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + pipelineOptions.setGcsUploadBufferSizeBytes(12345); + GcsUtil util = pipelineOptions.getGcsUtil(); + assertEquals((Integer) 12345, util.getUploadBufferSizeBytes()); + } + + @Test + public void testCreationWithExecutorServiceProvided() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + pipelineOptions.setExecutorService(Executors.newCachedThreadPool()); + assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService); + } + + @Test + public void testCreationWithGcsUtilProvided() { + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + GcsUtil gcsUtil = Mockito.mock(GcsUtil.class); + pipelineOptions.setGcsUtil(gcsUtil); + assertSame(gcsUtil, pipelineOptions.getGcsUtil()); + } + + @Test + public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception { + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + ExecutorService executorService = pipelineOptions.getExecutorService(); + + int numThreads = 100; + final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads]; + for (int i = 0; i < numThreads; i++) { + final int currentLatch = i; + countDownLatches[i] = new CountDownLatch(1); + executorService.execute( + new Runnable() { + @Override + public void run() { + // Wait for latch N and then release latch N - 1 + try { + countDownLatches[currentLatch].await(); + if (currentLatch > 0) { + countDownLatches[currentLatch - 1].countDown(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + }); + } + + // Release the last latch starting the chain reaction. + countDownLatches[countDownLatches.length - 1].countDown(); + executorService.shutdown(); + assertTrue("Expected tasks to complete", + executorService.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testGlobExpansion() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class); + + Objects modelObjects = new Objects(); + List<StorageObject> items = new ArrayList<>(); + // A directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); + + // Files within the directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile")); + + modelObjects.setItems(items); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")).thenReturn( + mockStorageGet); + when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList); + when(mockStorageGet.execute()).thenReturn( + new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); + when(mockStorageList.execute()).thenReturn(modelObjects); + + // Test a single file. + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"); + List<GcsPath> expectedFiles = + ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + // Test patterns. + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + } + + // Patterns that contain recursive wildcards ('**') are not supported. + @Test + public void testRecursiveGlobExpansionFails() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Unsupported wildcard usage"); + gcsUtil.expand(pattern); + } + + // GCSUtil.expand() should fail when matching a single object when that object does not exist. + // We should return the empty result since GCS get object is strongly consistent. + @Test + public void testNonExistentObjectReturnsEmptyResult() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile"); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "It don't exist", "Nothing here to see"); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( + mockStorageGet); + when(mockStorageGet.execute()).thenThrow(expectedException); + + assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern)); + } + + // GCSUtil.expand() should fail for other errors such as access denied. + @Test + public void testAccessDeniedObjectThrowsIOException() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile"); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, + "Waves hand mysteriously", "These aren't the buckets you're looking for"); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( + mockStorageGet); + when(mockStorageGet.execute()).thenThrow(expectedException); + + thrown.expect(IOException.class); + thrown.expectMessage("Unable to get the file object for path"); + gcsUtil.expand(pattern); + } + + @Test + public void testFileSizeNonBatch() throws Exception { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()).thenReturn( + new StorageObject().setSize(BigInteger.valueOf(1000))); + + assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test + public void testFileSizeWhenFileNotFoundNonBatch() throws Exception { + MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse(); + notFoundResponse.setContent(""); + notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND); + + MockHttpTransport mockTransport = + new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); + + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + + thrown.expect(FileNotFoundException.class); + gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")); + } + + @Test + public void testRetryFileSizeNonBatch() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff(); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000))); + + assertEquals(1000, + gcsUtil.getObject( + GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, + new FastNanoClockAndSleeper()).getSize().longValue()); + assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis()); + } + + @Test + public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { + JsonFactory jsonFactory = new JacksonFactory(); + + String contentBoundary = "batch_foobarbaz"; + String contentBoundaryLine = "--" + contentBoundary; + String endOfContentBoundaryLine = "--" + contentBoundary + "--"; + + GenericJson error = new GenericJson() + .set("error", new GenericJson().set("code", 404)); + error.setFactory(jsonFactory); + + String content = contentBoundaryLine + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: -1\n" + + "\n" + + error.toString() + + "\n" + + "\n" + + endOfContentBoundaryLine + + "\n"; + thrown.expect(FileNotFoundException.class); + MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse() + .setContentType("multipart/mixed; boundary=" + contentBoundary) + .setContent(content) + .setStatusCode(HttpStatusCodes.STATUS_CODE_OK); + + MockHttpTransport mockTransport = + new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test + public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception { + JsonFactory jsonFactory = new JacksonFactory(); + + String contentBoundary = "batch_foobarbaz"; + String contentBoundaryLine = "--" + contentBoundary; + String endOfContentBoundaryLine = "--" + contentBoundary + "--"; + + GenericJson error = new GenericJson() + .set("error", new GenericJson().set("code", 404)); + error.setFactory(jsonFactory); + + String content = contentBoundaryLine + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: -1\n" + + "\n" + + error.toString() + + "\n" + + "\n" + + endOfContentBoundaryLine + + "\n"; + thrown.expect(FileNotFoundException.class); + + final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); + when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); + + // 429: Too many requests, then 200: OK. + when(mockResponse.getStatusCode()).thenReturn(429, 200); + when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content)); + + // A mock transport that lets us mock the API responses. + MockHttpTransport mockTransport = + new MockHttpTransport.Builder() + .setLowLevelHttpRequest( + new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponse; + } + }) + .build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + gcsUtil.setStorageClient( + new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); + gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test + public void testCreateBucket() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.insert( + any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); + when(mockStorageInsert.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testCreateBucketAccessErrors() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, + "Waves hand mysteriously", "These aren't the buckets you're looking for"); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.insert( + any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); + when(mockStorageInsert.execute()) + .thenThrow(expectedException); + + thrown.expect(AccessDeniedException.class); + + gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testBucketAccessible() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testBucketDoesNotExistBecauseOfAccessError() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, + "Waves hand mysteriously", "These aren't the buckets you're looking for"); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(expectedException); + + assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testBucketDoesNotExist() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "It don't exist", "Nothing here to see")); + + assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testGetBucket() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testGetBucketNotExists() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "It don't exist", "Nothing here to see")); + + thrown.expect(FileNotFoundException.class); + thrown.expectMessage("It don't exist"); + gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testGCSChannelCloseIdempotent() throws IOException { + SeekableByteChannel channel = + new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null, + new ClientRequestHelper<StorageObject>()); + channel.close(); + channel.close(); + } + + /** + * Builds a fake GoogleJsonResponseException for testing API error handling. + */ + private static GoogleJsonResponseException googleJsonResponseException( + final int status, final String reason, final String message) throws IOException { + final JsonFactory jsonFactory = new JacksonFactory(); + HttpTransport transport = new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) throws IOException { + ErrorInfo errorInfo = new ErrorInfo(); + errorInfo.setReason(reason); + errorInfo.setMessage(message); + errorInfo.setFactory(jsonFactory); + GenericJson error = new GenericJson(); + error.set("code", status); + error.set("errors", Arrays.asList(errorInfo)); + error.setFactory(jsonFactory); + GenericJson errorResponse = new GenericJson(); + errorResponse.set("error", error); + errorResponse.setFactory(jsonFactory); + return new MockLowLevelHttpRequest().setResponse( + new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString()) + .setContentType(Json.MEDIA_TYPE).setStatusCode(status)); + } + }; + HttpRequest request = + transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); + request.setThrowExceptionOnExecuteError(false); + HttpResponse response = request.execute(); + return GoogleJsonResponseException.from(jsonFactory, response); + } + + private static List<String> makeStrings(String s, int n) { + ImmutableList.Builder<String> ret = ImmutableList.builder(); + for (int i = 0; i < n; ++i) { + ret.add(String.format("gs://bucket/%s%d", s, i)); + } + return ret.build(); + } + + private static List<GcsPath> makeGcsPaths(String s, int n) { + ImmutableList.Builder<GcsPath> ret = ImmutableList.builder(); + for (int i = 0; i < n; ++i) { + ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i))); + } + return ret.build(); + } + + private static int sumBatchSizes(List<BatchRequest> batches) { + int ret = 0; + for (BatchRequest b : batches) { + ret += b.size(); + assertThat(b.size(), greaterThan(0)); + } + return ret; + } + + @Test + public void testMakeCopyBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + + // 1 batch of files fits in 1 batch + batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + + // A little more than 5 batches of files fits in 6 batches + batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501)); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + } + + @Test + public void testInvalidCopyBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Number of source files 3"); + + gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1)); + } + + @Test + public void testMakeRemoveBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + + // 1 batch of files fits in 1 batch + batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + + // A little more than 5 batches of files fits in 6 batches + batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501)); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + } + + @Test + public void testMakeGetBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<StorageObjectOrIOException[]> results = Lists.newArrayList(); + List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + assertEquals(3, results.size()); + + // 1 batch of files fits in 1 batch + results = Lists.newArrayList(); + batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + assertEquals(100, results.size()); + + // A little more than 5 batches of files fits in 6 batches + results = Lists.newArrayList(); + batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + assertEquals(501, results.size()); + } + + /** + * A helper to wrap a {@link GenericJson} object in a content stream. + */ + private static InputStream toStream(String content) throws IOException { + return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java new file mode 100644 index 0000000..8e7878c --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link IntervalBoundedExponentialBackOff}. */ +@RunWith(JUnit4.class) +public class IntervalBoundedExponentialBackOffTest { + @Rule public ExpectedException exception = ExpectedException.none(); + + + @Test + public void testUsingInvalidInitialInterval() throws Exception { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Initial interval must be greater than zero."); + new IntervalBoundedExponentialBackOff(1000L, 0L); + } + + @Test + public void testUsingInvalidMaximumInterval() throws Exception { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Maximum interval must be greater than zero."); + new IntervalBoundedExponentialBackOff(-1L, 10L); + } + + @Test + public void testThatcertainNumberOfAttemptsReachesMaxInterval() throws Exception { + IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + } + + @Test + public void testThatResettingAllowsReuse() throws Exception { + IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + backOff.reset(); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + } + + @Test + public void testAtMaxInterval() throws Exception { + IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500); + assertFalse(backOff.atMaxInterval()); + backOff.nextBackOffMillis(); + assertFalse(backOff.atMaxInterval()); + backOff.nextBackOffMillis(); + assertTrue(backOff.atMaxInterval()); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java new file mode 100644 index 0000000..71554b5 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.HttpResponseInterceptor; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.Storage.Objects.Get; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.security.PrivateKey; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests for RetryHttpRequestInitializer. + */ +@RunWith(JUnit4.class) +public class RetryHttpRequestInitializerTest { + + @Mock private Credential mockCredential; + @Mock private PrivateKey mockPrivateKey; + @Mock private LowLevelHttpRequest mockLowLevelRequest; + @Mock private LowLevelHttpResponse mockLowLevelResponse; + @Mock private HttpResponseInterceptor mockHttpResponseInterceptor; + + private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); + private Storage storage; + + // Used to test retrying a request more than the default 10 times. + static class MockNanoClock implements NanoClock { + private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543, + 12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420}; + private int i = 0; + + @Override + public long nanoTime() { + return timesMs[i++ / 2] * 1000000; + } + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + HttpTransport lowLevelTransport = new HttpTransport() { + @Override + protected LowLevelHttpRequest buildRequest(String method, String url) + throws IOException { + return mockLowLevelRequest; + } + }; + + // Retry initializer will pass through to credential, since we can have + // only a single HttpRequestInitializer, and we use multiple Credential + // types in the SDK, not all of which allow for retry configuration. + RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer( + mockCredential, new MockNanoClock(), new Sleeper() { + @Override + public void sleep(long millis) throws InterruptedException {} + }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor); + storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer) + .setApplicationName("test").build(); + } + + @After + public void tearDown() { + verifyNoMoreInteractions(mockPrivateKey); + verifyNoMoreInteractions(mockLowLevelRequest); + verifyNoMoreInteractions(mockCredential); + verifyNoMoreInteractions(mockHttpResponseInterceptor); + } + + @Test + public void testBasicOperation() throws IOException { + when(mockLowLevelRequest.execute()) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(200); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockCredential).initialize(any(HttpRequest.class)); + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest).execute(); + verify(mockLowLevelResponse).getStatusCode(); + } + + /** + * Tests that a non-retriable error is not retried. + */ + @Test + public void testErrorCodeForbidden() throws IOException { + when(mockLowLevelRequest.execute()) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(403) // Non-retryable error. + .thenReturn(200); // Shouldn't happen. + + try { + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + } catch (HttpResponseException e) { + Assert.assertThat(e.getMessage(), Matchers.containsString("403")); + } + + verify(mockCredential).initialize(any(HttpRequest.class)); + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest).execute(); + verify(mockLowLevelResponse).getStatusCode(); + } + + /** + * Tests that a retriable error is retried. + */ + @Test + public void testRetryableError() throws IOException { + when(mockLowLevelRequest.execute()) + .thenReturn(mockLowLevelResponse) + .thenReturn(mockLowLevelResponse) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(503) // Retryable + .thenReturn(429) // We also retry on 429 Too Many Requests. + .thenReturn(200); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockCredential).initialize(any(HttpRequest.class)); + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest, times(3)).execute(); + verify(mockLowLevelResponse, times(3)).getStatusCode(); + } + + /** + * Tests that an IOException is retried. + */ + @Test + public void testThrowIOException() throws IOException { + when(mockLowLevelRequest.execute()) + .thenThrow(new IOException("Fake Error")) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(200); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockCredential).initialize(any(HttpRequest.class)); + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest, times(2)).execute(); + verify(mockLowLevelResponse).getStatusCode(); + } + + /** + * Tests that a retryable error is retried enough times. + */ + @Test + public void testRetryableErrorRetryEnoughTimes() throws IOException { + when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse); + final int retries = 10; + when(mockLowLevelResponse.getStatusCode()).thenAnswer(new Answer<Integer>(){ + int n = 0; + @Override + public Integer answer(InvocationOnMock invocation) { + return (n++ < retries - 1) ? 503 : 200; + }}); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockCredential).initialize(any(HttpRequest.class)); + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), + anyString()); + verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest, times(retries)).execute(); + verify(mockLowLevelResponse, times(retries)).getStatusCode(); + } + + /** + * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO exception handler + * is invoked. + */ + @Test + public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception { + // Counts the number of calls to execute the HTTP request. + final AtomicLong executeCount = new AtomicLong(); + + // 10 is a private internal constant in the Google API Client library. See + // com.google.api.client.http.HttpRequest#setNumberOfRetries + // TODO: update this test once the private internal constant is public. + final int defaultNumberOfRetries = 10; + + // A mock HTTP request that always throws SocketTimeoutException. + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + executeCount.incrementAndGet(); + throw new SocketTimeoutException("Fake forced timeout exception"); + } + }).build(); + + // A sample HTTP request to Google Cloud Storage that uses both default Transport and default + // RetryHttpInitializer. + Storage storage = new Storage.Builder( + transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build(); + + Get getRequest = storage.objects().get("gs://fake", "file"); + + try { + getRequest.execute(); + fail(); + } catch (Throwable e) { + assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class)); + assertEquals(1 + defaultNumberOfRetries, executeCount.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index 74aba0f..dde8be5 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -32,6 +32,7 @@ <name>Apache Beam :: SDKs :: Java :: Extensions</name> <modules> + <module>gcp-core</module> <module>jackson</module> <module>join-library</module> <module>sorter</module> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/harness/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml index 3d14401..76ceb3d 100644 --- a/sdks/java/harness/pom.xml +++ b/sdks/java/harness/pom.xml @@ -73,6 +73,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-core-java</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index d22c6c5..4cd0337 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -66,7 +66,10 @@ <artifactId>beam-sdks-java-core</artifactId> </dependency> - <!-- Build dependencies --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> @@ -94,12 +97,6 @@ </dependency> <dependency> - <groupId>com.google.auto.service</groupId> - <artifactId>auto-service</artifactId> - <optional>true</optional> - </dependency> - - <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>util</artifactId> </dependency> @@ -232,13 +229,20 @@ <artifactId>avro</artifactId> </dependency> + <!-- Build dependencies --> <dependency> <groupId>com.google.auto.value</groupId> <artifactId>auto-value</artifactId> <scope>provided</scope> </dependency> - <!-- test --> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <!-- Test dependencies --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId>
