Repository: beam
Updated Branches:
  refs/heads/master ed52d328d -> 6d3fe51ce


http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
new file mode 100644
index 0000000..03668ce
--- /dev/null
+++ 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -0,0 +1,798 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.HttpTesting;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.client.util.BackOff;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
+import com.google.cloud.hadoop.util.ClientRequestHelper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.SocketTimeoutException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.AccessDeniedException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test case for {@link GcsUtil}. */
+@RunWith(JUnit4.class)
+public class GcsUtilTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testGlobTranslation() {
+    assertEquals("foo", GcsUtil.globToRegexp("foo"));
+    assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o"));
+    assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?"));
+    assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*"));
+  }
+
+  private static GcsOptions gcsOptionsWithTestCredential() {
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    pipelineOptions.setGcpCredential(new TestCredential());
+    return pipelineOptions;
+  }
+
+  @Test
+  public void testCreationWithDefaultOptions() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    assertNotNull(pipelineOptions.getGcpCredential());
+  }
+
+  @Test
+  public void testUploadBufferSizeDefault() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil util = pipelineOptions.getGcsUtil();
+    assertNull(util.getUploadBufferSizeBytes());
+  }
+
+  @Test
+  public void testUploadBufferSizeUserSpecified() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    pipelineOptions.setGcsUploadBufferSizeBytes(12345);
+    GcsUtil util = pipelineOptions.getGcsUtil();
+    assertEquals((Integer) 12345, util.getUploadBufferSizeBytes());
+  }
+
+  @Test
+  public void testCreationWithExecutorServiceProvided() {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    pipelineOptions.setExecutorService(Executors.newCachedThreadPool());
+    assertSame(pipelineOptions.getExecutorService(), 
pipelineOptions.getGcsUtil().executorService);
+  }
+
+  @Test
+  public void testCreationWithGcsUtilProvided() {
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    GcsUtil gcsUtil = Mockito.mock(GcsUtil.class);
+    pipelineOptions.setGcsUtil(gcsUtil);
+    assertSame(gcsUtil, pipelineOptions.getGcsUtil());
+  }
+
+  @Test
+  public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() 
throws Exception {
+    GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    ExecutorService executorService = pipelineOptions.getExecutorService();
+
+    int numThreads = 100;
+    final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      final int currentLatch = i;
+      countDownLatches[i] = new CountDownLatch(1);
+      executorService.execute(
+          new Runnable() {
+            @Override
+            public void run() {
+              // Wait for latch N and then release latch N - 1
+              try {
+                countDownLatches[currentLatch].await();
+                if (currentLatch > 0) {
+                  countDownLatches[currentLatch - 1].countDown();
+                }
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    }
+
+    // Release the last latch starting the chain reaction.
+    countDownLatches[countDownLatches.length - 1].countDown();
+    executorService.shutdown();
+    assertTrue("Expected tasks to complete",
+        executorService.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testGlobExpansion() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = 
Mockito.mock(Storage.Objects.Get.class);
+    Storage.Objects.List mockStorageList = 
Mockito.mock(Storage.Objects.List.class);
+
+    Objects modelObjects = new Objects();
+    List<StorageObject> items = new ArrayList<>();
+    // A directory
+    items.add(new 
StorageObject().setBucket("testbucket").setName("testdirectory/"));
+
+    // Files within the directory
+    items.add(new 
StorageObject().setBucket("testbucket").setName("testdirectory/file1name"));
+    items.add(new 
StorageObject().setBucket("testbucket").setName("testdirectory/file2name"));
+    items.add(new 
StorageObject().setBucket("testbucket").setName("testdirectory/file3name"));
+    items.add(new 
StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
+    items.add(new 
StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile"));
+
+    modelObjects.setItems(items);
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", 
"testdirectory/otherfile")).thenReturn(
+        mockStorageGet);
+    when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
+    when(mockStorageGet.execute()).thenReturn(
+        new 
StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
+    when(mockStorageList.execute()).thenReturn(modelObjects);
+
+    // Test a single file.
+    {
+      GcsPath pattern = 
GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
+      List<GcsPath> expectedFiles =
+          
ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    // Test patterns.
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    {
+      GcsPath pattern = 
GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    {
+      GcsPath pattern = 
GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
+      List<GcsPath> expectedFiles = ImmutableList.of(
+          GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+          GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
+
+      assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
+    }
+  }
+
+  // Patterns that contain recursive wildcards ('**') are not supported.
+  @Test
+  public void testRecursiveGlobExpansionFails() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+    GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**");
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Unsupported wildcard usage");
+    gcsUtil.expand(pattern);
+  }
+
+  // GCSUtil.expand() should fail when matching a single object when that 
object does not exist.
+  // We should return the empty result since GCS get object is strongly 
consistent.
+  @Test
+  public void testNonExistentObjectReturnsEmptyResult() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = 
Mockito.mock(Storage.Objects.Get.class);
+
+    GcsPath pattern = 
GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile");
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+            "It don't exist", "Nothing here to see");
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get(pattern.getBucket(), 
pattern.getObject())).thenReturn(
+        mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(expectedException);
+
+    assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern));
+  }
+
+  // GCSUtil.expand() should fail for other errors such as access denied.
+  @Test
+  public void testAccessDeniedObjectThrowsIOException() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = 
Mockito.mock(Storage.Objects.Get.class);
+
+    GcsPath pattern = 
GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile");
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously", "These aren't the buckets you're 
looking for");
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get(pattern.getBucket(), 
pattern.getObject())).thenReturn(
+        mockStorageGet);
+    when(mockStorageGet.execute()).thenThrow(expectedException);
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage("Unable to get the file object for path");
+    gcsUtil.expand(pattern);
+  }
+
+  @Test
+  public void testFileSizeNonBatch() throws Exception {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = 
Mockito.mock(Storage.Objects.Get.class);
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", 
"testobject")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute()).thenReturn(
+            new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+    assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", 
"testobject")));
+  }
+
+  @Test
+  public void testFileSizeWhenFileNotFoundNonBatch() throws Exception {
+    MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse();
+    notFoundResponse.setContent("");
+    notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+
+    MockHttpTransport mockTransport =
+            new 
MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    gcsUtil.setStorageClient(new Storage(mockTransport, 
Transport.getJsonFactory(), null));
+
+    thrown.expect(FileNotFoundException.class);
+    gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
+  }
+
+  @Test
+  public void testRetryFileSizeNonBatch() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
+    Storage.Objects.Get mockStorageGet = 
Mockito.mock(Storage.Objects.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff();
+
+    when(mockStorage.objects()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket", 
"testobject")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+            .thenThrow(new SocketTimeoutException("SocketException"))
+            .thenThrow(new SocketTimeoutException("SocketException"))
+            .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
+
+    assertEquals(1000,
+        gcsUtil.getObject(
+            GcsPath.fromComponents("testbucket", "testobject"),
+            mockBackOff,
+            new FastNanoClockAndSleeper()).getSize().longValue());
+    assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
+  }
+
+  @Test
+  public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception {
+    JsonFactory jsonFactory = new JacksonFactory();
+
+    String contentBoundary = "batch_foobarbaz";
+    String contentBoundaryLine = "--" + contentBoundary;
+    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
+
+    GenericJson error = new GenericJson()
+        .set("error", new GenericJson().set("code", 404));
+    error.setFactory(jsonFactory);
+
+    String content = contentBoundaryLine + "\n"
+        + "Content-Type: application/http\n"
+        + "\n"
+        + "HTTP/1.1 404 Not Found\n"
+        + "Content-Length: -1\n"
+        + "\n"
+        + error.toString()
+        + "\n"
+        + "\n"
+        + endOfContentBoundaryLine
+        + "\n";
+    thrown.expect(FileNotFoundException.class);
+    MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse()
+        .setContentType("multipart/mixed; boundary=" + contentBoundary)
+        .setContent(content)
+        .setStatusCode(HttpStatusCodes.STATUS_CODE_OK);
+
+    MockHttpTransport mockTransport =
+        new 
MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
+
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    gcsUtil.setStorageClient(new Storage(mockTransport, 
Transport.getJsonFactory(), null));
+    gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", 
"testobject")));
+  }
+
+  @Test
+  public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception {
+    JsonFactory jsonFactory = new JacksonFactory();
+
+    String contentBoundary = "batch_foobarbaz";
+    String contentBoundaryLine = "--" + contentBoundary;
+    String endOfContentBoundaryLine = "--" + contentBoundary + "--";
+
+    GenericJson error = new GenericJson()
+        .set("error", new GenericJson().set("code", 404));
+    error.setFactory(jsonFactory);
+
+    String content = contentBoundaryLine + "\n"
+        + "Content-Type: application/http\n"
+        + "\n"
+        + "HTTP/1.1 404 Not Found\n"
+        + "Content-Length: -1\n"
+        + "\n"
+        + error.toString()
+        + "\n"
+        + "\n"
+        + endOfContentBoundaryLine
+        + "\n";
+    thrown.expect(FileNotFoundException.class);
+
+    final LowLevelHttpResponse mockResponse = 
Mockito.mock(LowLevelHttpResponse.class);
+    when(mockResponse.getContentType()).thenReturn("multipart/mixed; 
boundary=" + contentBoundary);
+
+    // 429: Too many requests, then 200: OK.
+    when(mockResponse.getStatusCode()).thenReturn(429, 200);
+    when(mockResponse.getContent()).thenReturn(toStream("error"), 
toStream(content));
+
+    // A mock transport that lets us mock the API responses.
+    MockHttpTransport mockTransport =
+        new MockHttpTransport.Builder()
+            .setLowLevelHttpRequest(
+                new MockLowLevelHttpRequest() {
+                  @Override
+                  public LowLevelHttpResponse execute() throws IOException {
+                    return mockResponse;
+                  }
+                })
+            .build();
+
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+        gcsUtil.setStorageClient(
+        new Storage(mockTransport, Transport.getJsonFactory(), new 
RetryHttpRequestInitializer()));
+    gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", 
"testobject")));
+  }
+
+  @Test
+  public void testCreateBucket() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets.Insert mockStorageInsert = 
Mockito.mock(Storage.Buckets.Insert.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.insert(
+           any(String.class), 
any(Bucket.class))).thenReturn(mockStorageInsert);
+    when(mockStorageInsert.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new 
FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testCreateBucketAccessErrors() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Insert mockStorageInsert = 
Mockito.mock(Storage.Buckets.Insert.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously", "These aren't the buckets you're 
looking for");
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.insert(
+           any(String.class), 
any(Bucket.class))).thenReturn(mockStorageInsert);
+    when(mockStorageInsert.execute())
+        .thenThrow(expectedException);
+
+    thrown.expect(AccessDeniedException.class);
+
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new 
FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testBucketAccessible() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", 
"testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testBucketDoesNotExistBecauseOfAccessError() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously", "These aren't the buckets you're 
looking for");
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(expectedException);
+
+    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", 
"testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testBucketDoesNotExist() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        
.thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+            "It don't exist", "Nothing here to see"));
+
+    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", 
"testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testGetBucket() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", 
"testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testGetBucketNotExists() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = 
Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        
.thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+            "It don't exist", "Nothing here to see"));
+
+    thrown.expect(FileNotFoundException.class);
+    thrown.expectMessage("It don't exist");
+    gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
+                      mockBackOff, new FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testGCSChannelCloseIdempotent() throws IOException {
+    SeekableByteChannel channel =
+        new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", 
null,
+        new ClientRequestHelper<StorageObject>());
+    channel.close();
+    channel.close();
+  }
+
+  /**
+   * Builds a fake GoogleJsonResponseException for testing API error handling.
+   */
+  private static GoogleJsonResponseException googleJsonResponseException(
+      final int status, final String reason, final String message) throws 
IOException {
+    final JsonFactory jsonFactory = new JacksonFactory();
+    HttpTransport transport = new MockHttpTransport() {
+      @Override
+      public LowLevelHttpRequest buildRequest(String method, String url) 
throws IOException {
+        ErrorInfo errorInfo = new ErrorInfo();
+        errorInfo.setReason(reason);
+        errorInfo.setMessage(message);
+        errorInfo.setFactory(jsonFactory);
+        GenericJson error = new GenericJson();
+        error.set("code", status);
+        error.set("errors", Arrays.asList(errorInfo));
+        error.setFactory(jsonFactory);
+        GenericJson errorResponse = new GenericJson();
+        errorResponse.set("error", error);
+        errorResponse.setFactory(jsonFactory);
+        return new MockLowLevelHttpRequest().setResponse(
+            new 
MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString())
+            .setContentType(Json.MEDIA_TYPE).setStatusCode(status));
+        }
+    };
+    HttpRequest request =
+        
transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
+    request.setThrowExceptionOnExecuteError(false);
+    HttpResponse response = request.execute();
+    return GoogleJsonResponseException.from(jsonFactory, response);
+  }
+
+  private static List<String> makeStrings(String s, int n) {
+    ImmutableList.Builder<String> ret = ImmutableList.builder();
+    for (int i = 0; i < n; ++i) {
+      ret.add(String.format("gs://bucket/%s%d", s, i));
+    }
+    return ret.build();
+  }
+
+  private static List<GcsPath> makeGcsPaths(String s, int n) {
+    ImmutableList.Builder<GcsPath> ret = ImmutableList.builder();
+    for (int i = 0; i < n; ++i) {
+      ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i)));
+    }
+    return ret.build();
+  }
+
+  private static int sumBatchSizes(List<BatchRequest> batches) {
+    int ret = 0;
+    for (BatchRequest b : batches) {
+      ret += b.size();
+      assertThat(b.size(), greaterThan(0));
+    }
+    return ret;
+  }
+
+  @Test
+  public void testMakeCopyBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), 
makeStrings("d", 3));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+
+    // 1 batch of files fits in 1 batch
+    batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 
100));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+
+    // A little more than 5 batches of files fits in 6 batches
+    batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 
501));
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+  }
+
+  @Test
+  public void testInvalidCopyBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Number of source files 3");
+
+    gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1));
+  }
+
+  @Test
+  public void testMakeRemoveBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 
3));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+
+    // 1 batch of files fits in 1 batch
+    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+
+    // A little more than 5 batches of files fits in 6 batches
+    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+  }
+
+  @Test
+  public void testMakeGetBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<StorageObjectOrIOException[]> results = Lists.newArrayList();
+    List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), 
results);
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+    assertEquals(3, results.size());
+
+    // 1 batch of files fits in 1 batch
+    results = Lists.newArrayList();
+    batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results);
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+    assertEquals(100, results.size());
+
+    // A little more than 5 batches of files fits in 6 batches
+    results = Lists.newArrayList();
+    batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results);
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+    assertEquals(501, results.size());
+  }
+
+  /**
+   * A helper to wrap a {@link GenericJson} object in a content stream.
+   */
+  private static InputStream toStream(String content) throws IOException {
+    return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
new file mode 100644
index 0000000..8e7878c
--- /dev/null
+++ 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link IntervalBoundedExponentialBackOff}. */
+@RunWith(JUnit4.class)
+public class IntervalBoundedExponentialBackOffTest {
+  @Rule public ExpectedException exception = ExpectedException.none();
+
+
+  @Test
+  public void testUsingInvalidInitialInterval() throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Initial interval must be greater than zero.");
+    new IntervalBoundedExponentialBackOff(1000L, 0L);
+  }
+
+  @Test
+  public void testUsingInvalidMaximumInterval() throws Exception {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Maximum interval must be greater than zero.");
+    new IntervalBoundedExponentialBackOff(-1L, 10L);
+  }
+
+  @Test
+  public void testThatcertainNumberOfAttemptsReachesMaxInterval() throws 
Exception {
+    IntervalBoundedExponentialBackOff backOff = new 
IntervalBoundedExponentialBackOff(1000L, 500);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), 
lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), 
lessThan(1126L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+  }
+
+  @Test
+  public void testThatResettingAllowsReuse() throws Exception {
+    IntervalBoundedExponentialBackOff backOff = new 
IntervalBoundedExponentialBackOff(1000L, 500);
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), 
lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), 
lessThan(1126L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    backOff.reset();
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), 
lessThan(751L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), 
lessThan(1126L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+  }
+
+  @Test
+  public void testAtMaxInterval() throws Exception {
+    IntervalBoundedExponentialBackOff backOff = new 
IntervalBoundedExponentialBackOff(1000L, 500);
+    assertFalse(backOff.atMaxInterval());
+    backOff.nextBackOffMillis();
+    assertFalse(backOff.atMaxInterval());
+    backOff.nextBackOffMillis();
+    assertTrue(backOff.atMaxInterval());
+    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L),
+        lessThanOrEqualTo(1500L)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
new file mode 100644
index 0000000..71554b5
--- /dev/null
+++ 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.HttpResponseInterceptor;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.LowLevelHttpRequest;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.Storage.Objects.Get;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.security.PrivateKey;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests for RetryHttpRequestInitializer.
+ */
+@RunWith(JUnit4.class)
+public class RetryHttpRequestInitializerTest {
+
+  @Mock private Credential mockCredential;
+  @Mock private PrivateKey mockPrivateKey;
+  @Mock private LowLevelHttpRequest mockLowLevelRequest;
+  @Mock private LowLevelHttpResponse mockLowLevelResponse;
+  @Mock private HttpResponseInterceptor mockHttpResponseInterceptor;
+
+  private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+  private Storage storage;
+
+  // Used to test retrying a request more than the default 10 times.
+  static class MockNanoClock implements NanoClock {
+    private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543,
+        12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420};
+    private int i = 0;
+
+    @Override
+    public long nanoTime() {
+      return timesMs[i++ / 2] * 1000000;
+    }
+  }
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    HttpTransport lowLevelTransport = new HttpTransport() {
+      @Override
+      protected LowLevelHttpRequest buildRequest(String method, String url)
+          throws IOException {
+        return mockLowLevelRequest;
+      }
+    };
+
+    // Retry initializer will pass through to credential, since we can have
+    // only a single HttpRequestInitializer, and we use multiple Credential
+    // types in the SDK, not all of which allow for retry configuration.
+    RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer(
+        mockCredential, new MockNanoClock(), new Sleeper() {
+          @Override
+          public void sleep(long millis) throws InterruptedException {}
+        }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor);
+    storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer)
+        .setApplicationName("test").build();
+  }
+
+  @After
+  public void tearDown() {
+    verifyNoMoreInteractions(mockPrivateKey);
+    verifyNoMoreInteractions(mockLowLevelRequest);
+    verifyNoMoreInteractions(mockCredential);
+    verifyNoMoreInteractions(mockHttpResponseInterceptor);
+  }
+
+  @Test
+  public void testBasicOperation() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(200);
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockCredential).initialize(any(HttpRequest.class));
+    
verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest).execute();
+    verify(mockLowLevelResponse).getStatusCode();
+  }
+
+  /**
+   * Tests that a non-retriable error is not retried.
+   */
+  @Test
+  public void testErrorCodeForbidden() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(403)  // Non-retryable error.
+        .thenReturn(200); // Shouldn't happen.
+
+    try {
+      Storage.Buckets.Get result = storage.buckets().get("test");
+      HttpResponse response = result.executeUnparsed();
+      assertNotNull(response);
+    } catch (HttpResponseException e) {
+      Assert.assertThat(e.getMessage(), Matchers.containsString("403"));
+    }
+
+    verify(mockCredential).initialize(any(HttpRequest.class));
+    
verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest).execute();
+    verify(mockLowLevelResponse).getStatusCode();
+  }
+
+  /**
+   * Tests that a retriable error is retried.
+   */
+  @Test
+  public void testRetryableError() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenReturn(mockLowLevelResponse)
+        .thenReturn(mockLowLevelResponse)
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(503)  // Retryable
+        .thenReturn(429)  // We also retry on 429 Too Many Requests.
+        .thenReturn(200);
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockCredential).initialize(any(HttpRequest.class));
+    
verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest, times(3)).execute();
+    verify(mockLowLevelResponse, times(3)).getStatusCode();
+  }
+
+  /**
+   * Tests that an IOException is retried.
+   */
+  @Test
+  public void testThrowIOException() throws IOException {
+    when(mockLowLevelRequest.execute())
+        .thenThrow(new IOException("Fake Error"))
+        .thenReturn(mockLowLevelResponse);
+    when(mockLowLevelResponse.getStatusCode())
+        .thenReturn(200);
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockCredential).initialize(any(HttpRequest.class));
+    
verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce())
+        .addHeader(anyString(), anyString());
+    verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest, times(2)).execute();
+    verify(mockLowLevelResponse).getStatusCode();
+  }
+
+  /**
+   * Tests that a retryable error is retried enough times.
+   */
+  @Test
+  public void testRetryableErrorRetryEnoughTimes() throws IOException {
+    when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse);
+    final int retries = 10;
+    when(mockLowLevelResponse.getStatusCode()).thenAnswer(new 
Answer<Integer>(){
+      int n = 0;
+      @Override
+      public Integer answer(InvocationOnMock invocation) {
+        return (n++ < retries - 1) ? 503 : 200;
+      }});
+
+    Storage.Buckets.Get result = storage.buckets().get("test");
+    HttpResponse response = result.executeUnparsed();
+    assertNotNull(response);
+
+    verify(mockCredential).initialize(any(HttpRequest.class));
+    
verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
+    verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(),
+        anyString());
+    verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt());
+    verify(mockLowLevelRequest, times(retries)).execute();
+    verify(mockLowLevelResponse, times(retries)).getStatusCode();
+  }
+
+  /**
+   * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO 
exception handler
+   * is invoked.
+   */
+  @Test
+  public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception {
+    // Counts the number of calls to execute the HTTP request.
+    final AtomicLong executeCount = new AtomicLong();
+
+    // 10 is a private internal constant in the Google API Client library. See
+    // com.google.api.client.http.HttpRequest#setNumberOfRetries
+    // TODO: update this test once the private internal constant is public.
+    final int defaultNumberOfRetries = 10;
+
+    // A mock HTTP request that always throws SocketTimeoutException.
+    MockHttpTransport transport =
+        new MockHttpTransport.Builder().setLowLevelHttpRequest(new 
MockLowLevelHttpRequest() {
+          @Override
+          public LowLevelHttpResponse execute() throws IOException {
+            executeCount.incrementAndGet();
+            throw new SocketTimeoutException("Fake forced timeout exception");
+          }
+        }).build();
+
+    // A sample HTTP request to Google Cloud Storage that uses both default 
Transport and default
+    // RetryHttpInitializer.
+    Storage storage = new Storage.Builder(
+        transport, Transport.getJsonFactory(), new 
RetryHttpRequestInitializer()).build();
+
+    Get getRequest = storage.objects().get("gs://fake", "file");
+
+    try {
+      getRequest.execute();
+      fail();
+    } catch (Throwable e) {
+      assertThat(e, 
Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
+      assertEquals(1 + defaultNumberOfRetries, executeCount.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 74aba0f..dde8be5 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -32,6 +32,7 @@
   <name>Apache Beam :: SDKs :: Java :: Extensions</name>
 
   <modules>
+    <module>gcp-core</module>
     <module>jackson</module>
     <module>join-library</module>
     <module>sorter</module>

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 3d14401..76ceb3d 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -73,6 +73,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-java</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml 
b/sdks/java/io/google-cloud-platform/pom.xml
index d22c6c5..4cd0337 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -66,7 +66,10 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
-    <!-- Build dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-gcp-core</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
@@ -94,12 +97,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.auto.service</groupId>
-      <artifactId>auto-service</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.cloud.bigdataoss</groupId>
       <artifactId>util</artifactId>
     </dependency>
@@ -232,13 +229,20 @@
       <artifactId>avro</artifactId>
     </dependency>
 
+    <!-- Build dependencies -->
     <dependency>
       <groupId>com.google.auto.value</groupId>
       <artifactId>auto-value</artifactId>
       <scope>provided</scope>
     </dependency>
 
-    <!--  test -->
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <!--  Test dependencies -->
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>

Reply via email to