Repository: beam Updated Branches: refs/heads/master 9114eb3ee -> 4c3174b80
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java new file mode 100644 index 0000000..6ffcaeb --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -0,0 +1,799 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import com.google.api.client.googleapis.batch.BatchRequest; +import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.GenericJson; +import com.google.api.client.json.Json; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.testing.http.HttpTesting; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.client.util.BackOff; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Bucket; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; +import com.google.cloud.hadoop.util.ClientRequestHelper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.SocketTimeoutException; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.AccessDeniedException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Test case for {@link GcsUtil}. */ +@RunWith(JUnit4.class) +public class GcsUtilTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testGlobTranslation() { + assertEquals("foo", GcsUtil.globToRegexp("foo")); + assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o")); + assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?")); + assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*")); + } + + private static GcsOptions gcsOptionsWithTestCredential() { + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + pipelineOptions.setGcpCredential(new TestCredential()); + return pipelineOptions; + } + + @Test + public void testCreationWithDefaultOptions() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + assertNotNull(pipelineOptions.getGcpCredential()); + } + + @Test + public void testUploadBufferSizeDefault() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil util = pipelineOptions.getGcsUtil(); + assertNull(util.getUploadBufferSizeBytes()); + } + + @Test + public void testUploadBufferSizeUserSpecified() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + pipelineOptions.setGcsUploadBufferSizeBytes(12345); + GcsUtil util = pipelineOptions.getGcsUtil(); + assertEquals((Integer) 12345, util.getUploadBufferSizeBytes()); + } + + @Test + public void testCreationWithExecutorServiceProvided() { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + pipelineOptions.setExecutorService(Executors.newCachedThreadPool()); + assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService); + } + + @Test + public void testCreationWithGcsUtilProvided() { + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + GcsUtil gcsUtil = Mockito.mock(GcsUtil.class); + pipelineOptions.setGcsUtil(gcsUtil); + assertSame(gcsUtil, pipelineOptions.getGcsUtil()); + } + + @Test + public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception { + GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); + ExecutorService executorService = pipelineOptions.getExecutorService(); + + int numThreads = 100; + final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads]; + for (int i = 0; i < numThreads; i++) { + final int currentLatch = i; + countDownLatches[i] = new CountDownLatch(1); + executorService.execute( + new Runnable() { + @Override + public void run() { + // Wait for latch N and then release latch N - 1 + try { + countDownLatches[currentLatch].await(); + if (currentLatch > 0) { + countDownLatches[currentLatch - 1].countDown(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + }); + } + + // Release the last latch starting the chain reaction. + countDownLatches[countDownLatches.length - 1].countDown(); + executorService.shutdown(); + assertTrue("Expected tasks to complete", + executorService.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testGlobExpansion() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class); + + Objects modelObjects = new Objects(); + List<StorageObject> items = new ArrayList<>(); + // A directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); + + // Files within the directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile")); + + modelObjects.setItems(items); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")).thenReturn( + mockStorageGet); + when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList); + when(mockStorageGet.execute()).thenReturn( + new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); + when(mockStorageList.execute()).thenReturn(modelObjects); + + // Test a single file. + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"); + List<GcsPath> expectedFiles = + ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + // Test patterns. + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } + } + + // Patterns that contain recursive wildcards ('**') are not supported. + @Test + public void testRecursiveGlobExpansionFails() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Unsupported wildcard usage"); + gcsUtil.expand(pattern); + } + + // GCSUtil.expand() should fail when matching a single object when that object does not exist. + // We should return the empty result since GCS get object is strongly consistent. + @Test + public void testNonExistentObjectReturnsEmptyResult() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile"); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "It don't exist", "Nothing here to see"); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( + mockStorageGet); + when(mockStorageGet.execute()).thenThrow(expectedException); + + assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern)); + } + + // GCSUtil.expand() should fail for other errors such as access denied. + @Test + public void testAccessDeniedObjectThrowsIOException() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile"); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, + "Waves hand mysteriously", "These aren't the buckets you're looking for"); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( + mockStorageGet); + when(mockStorageGet.execute()).thenThrow(expectedException); + + thrown.expect(IOException.class); + thrown.expectMessage("Unable to get the file object for path"); + gcsUtil.expand(pattern); + } + + @Test + public void testFileSizeNonBatch() throws Exception { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()).thenReturn( + new StorageObject().setSize(BigInteger.valueOf(1000))); + + assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test + public void testFileSizeWhenFileNotFoundNonBatch() throws Exception { + MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse(); + notFoundResponse.setContent(""); + notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND); + + MockHttpTransport mockTransport = + new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); + + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + + thrown.expect(FileNotFoundException.class); + gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")); + } + + @Test + public void testRetryFileSizeNonBatch() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff(); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000))); + + assertEquals(1000, + gcsUtil.getObject( + GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, + new FastNanoClockAndSleeper()).getSize().longValue()); + assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis()); + } + + @Test + public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { + JsonFactory jsonFactory = new JacksonFactory(); + + String contentBoundary = "batch_foobarbaz"; + String contentBoundaryLine = "--" + contentBoundary; + String endOfContentBoundaryLine = "--" + contentBoundary + "--"; + + GenericJson error = new GenericJson() + .set("error", new GenericJson().set("code", 404)); + error.setFactory(jsonFactory); + + String content = contentBoundaryLine + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: -1\n" + + "\n" + + error.toString() + + "\n" + + "\n" + + endOfContentBoundaryLine + + "\n"; + thrown.expect(FileNotFoundException.class); + MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse() + .setContentType("multipart/mixed; boundary=" + contentBoundary) + .setContent(content) + .setStatusCode(HttpStatusCodes.STATUS_CODE_OK); + + MockHttpTransport mockTransport = + new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test + public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception { + JsonFactory jsonFactory = new JacksonFactory(); + + String contentBoundary = "batch_foobarbaz"; + String contentBoundaryLine = "--" + contentBoundary; + String endOfContentBoundaryLine = "--" + contentBoundary + "--"; + + GenericJson error = new GenericJson() + .set("error", new GenericJson().set("code", 404)); + error.setFactory(jsonFactory); + + String content = contentBoundaryLine + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: -1\n" + + "\n" + + error.toString() + + "\n" + + "\n" + + endOfContentBoundaryLine + + "\n"; + thrown.expect(FileNotFoundException.class); + + final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); + when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); + + // 429: Too many requests, then 200: OK. + when(mockResponse.getStatusCode()).thenReturn(429, 200); + when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content)); + + // A mock transport that lets us mock the API responses. + MockHttpTransport mockTransport = + new MockHttpTransport.Builder() + .setLowLevelHttpRequest( + new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponse; + } + }) + .build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + gcsUtil.setStorageClient( + new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); + gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test + public void testCreateBucket() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.insert( + any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); + when(mockStorageInsert.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testCreateBucketAccessErrors() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, + "Waves hand mysteriously", "These aren't the buckets you're looking for"); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.insert( + any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); + when(mockStorageInsert.execute()) + .thenThrow(expectedException); + + thrown.expect(AccessDeniedException.class); + + gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testBucketAccessible() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testBucketDoesNotExistBecauseOfAccessError() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, + "Waves hand mysteriously", "These aren't the buckets you're looking for"); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(expectedException); + + assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testBucketDoesNotExist() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "It don't exist", "Nothing here to see")); + + assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testGetBucket() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testGetBucketNotExists() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "It don't exist", "Nothing here to see")); + + thrown.expect(FileNotFoundException.class); + thrown.expectMessage("It don't exist"); + gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testGCSChannelCloseIdempotent() throws IOException { + SeekableByteChannel channel = + new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null, + new ClientRequestHelper<StorageObject>()); + channel.close(); + channel.close(); + } + + /** + * Builds a fake GoogleJsonResponseException for testing API error handling. + */ + private static GoogleJsonResponseException googleJsonResponseException( + final int status, final String reason, final String message) throws IOException { + final JsonFactory jsonFactory = new JacksonFactory(); + HttpTransport transport = new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) throws IOException { + ErrorInfo errorInfo = new ErrorInfo(); + errorInfo.setReason(reason); + errorInfo.setMessage(message); + errorInfo.setFactory(jsonFactory); + GenericJson error = new GenericJson(); + error.set("code", status); + error.set("errors", Arrays.asList(errorInfo)); + error.setFactory(jsonFactory); + GenericJson errorResponse = new GenericJson(); + errorResponse.set("error", error); + errorResponse.setFactory(jsonFactory); + return new MockLowLevelHttpRequest().setResponse( + new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString()) + .setContentType(Json.MEDIA_TYPE).setStatusCode(status)); + } + }; + HttpRequest request = + transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); + request.setThrowExceptionOnExecuteError(false); + HttpResponse response = request.execute(); + return GoogleJsonResponseException.from(jsonFactory, response); + } + + private static List<String> makeStrings(String s, int n) { + ImmutableList.Builder<String> ret = ImmutableList.builder(); + for (int i = 0; i < n; ++i) { + ret.add(String.format("gs://bucket/%s%d", s, i)); + } + return ret.build(); + } + + private static List<GcsPath> makeGcsPaths(String s, int n) { + ImmutableList.Builder<GcsPath> ret = ImmutableList.builder(); + for (int i = 0; i < n; ++i) { + ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i))); + } + return ret.build(); + } + + private static int sumBatchSizes(List<BatchRequest> batches) { + int ret = 0; + for (BatchRequest b : batches) { + ret += b.size(); + assertThat(b.size(), greaterThan(0)); + } + return ret; + } + + @Test + public void testMakeCopyBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + + // 1 batch of files fits in 1 batch + batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + + // A little more than 5 batches of files fits in 6 batches + batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501)); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + } + + @Test + public void testInvalidCopyBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Number of source files 3"); + + gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1)); + } + + @Test + public void testMakeRemoveBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + + // 1 batch of files fits in 1 batch + batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + + // A little more than 5 batches of files fits in 6 batches + batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501)); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + } + + @Test + public void testMakeGetBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<StorageObjectOrIOException[]> results = Lists.newArrayList(); + List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + assertEquals(3, results.size()); + + // 1 batch of files fits in 1 batch + results = Lists.newArrayList(); + batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + assertEquals(100, results.size()); + + // A little more than 5 batches of files fits in 6 batches + results = Lists.newArrayList(); + batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + assertEquals(501, results.size()); + } + + /** + * A helper to wrap a {@link GenericJson} object in a content stream. + */ + private static InputStream toStream(String content) throws IOException { + return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java new file mode 100644 index 0000000..37551a4 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.HttpResponseInterceptor; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.Storage.Objects.Get; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.security.PrivateKey; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests for RetryHttpRequestInitializer. + */ +@RunWith(JUnit4.class) +public class RetryHttpRequestInitializerTest { + + @Mock private PrivateKey mockPrivateKey; + @Mock private LowLevelHttpRequest mockLowLevelRequest; + @Mock private LowLevelHttpResponse mockLowLevelResponse; + @Mock private HttpResponseInterceptor mockHttpResponseInterceptor; + + private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); + private Storage storage; + + // Used to test retrying a request more than the default 10 times. + static class MockNanoClock implements NanoClock { + private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543, + 12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420}; + private int i = 0; + + @Override + public long nanoTime() { + return timesMs[i++ / 2] * 1000000; + } + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + HttpTransport lowLevelTransport = new HttpTransport() { + @Override + protected LowLevelHttpRequest buildRequest(String method, String url) + throws IOException { + return mockLowLevelRequest; + } + }; + + // Retry initializer will pass through to credential, since we can have + // only a single HttpRequestInitializer, and we use multiple Credential + // types in the SDK, not all of which allow for retry configuration. + RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer( + new MockNanoClock(), new Sleeper() { + @Override + public void sleep(long millis) throws InterruptedException {} + }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor); + storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer) + .setApplicationName("test").build(); + } + + @After + public void tearDown() { + verifyNoMoreInteractions(mockPrivateKey); + verifyNoMoreInteractions(mockLowLevelRequest); + verifyNoMoreInteractions(mockHttpResponseInterceptor); + } + + @Test + public void testBasicOperation() throws IOException { + when(mockLowLevelRequest.execute()) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(200); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest).execute(); + verify(mockLowLevelResponse).getStatusCode(); + } + + /** + * Tests that a non-retriable error is not retried. + */ + @Test + public void testErrorCodeForbidden() throws IOException { + when(mockLowLevelRequest.execute()) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(403) // Non-retryable error. + .thenReturn(200); // Shouldn't happen. + + try { + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + } catch (HttpResponseException e) { + Assert.assertThat(e.getMessage(), Matchers.containsString("403")); + } + + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest).execute(); + verify(mockLowLevelResponse).getStatusCode(); + } + + /** + * Tests that a retriable error is retried. + */ + @Test + public void testRetryableError() throws IOException { + when(mockLowLevelRequest.execute()) + .thenReturn(mockLowLevelResponse) + .thenReturn(mockLowLevelResponse) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(503) // Retryable + .thenReturn(429) // We also retry on 429 Too Many Requests. + .thenReturn(200); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest, times(3)).execute(); + verify(mockLowLevelResponse, times(3)).getStatusCode(); + } + + /** + * Tests that an IOException is retried. + */ + @Test + public void testThrowIOException() throws IOException { + when(mockLowLevelRequest.execute()) + .thenThrow(new IOException("Fake Error")) + .thenReturn(mockLowLevelResponse); + when(mockLowLevelResponse.getStatusCode()) + .thenReturn(200); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()) + .addHeader(anyString(), anyString()); + verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest, times(2)).execute(); + verify(mockLowLevelResponse).getStatusCode(); + } + + /** + * Tests that a retryable error is retried enough times. + */ + @Test + public void testRetryableErrorRetryEnoughTimes() throws IOException { + when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse); + final int retries = 10; + when(mockLowLevelResponse.getStatusCode()).thenAnswer(new Answer<Integer>(){ + int n = 0; + @Override + public Integer answer(InvocationOnMock invocation) { + return (n++ < retries - 1) ? 503 : 200; + }}); + + Storage.Buckets.Get result = storage.buckets().get("test"); + HttpResponse response = result.executeUnparsed(); + assertNotNull(response); + + verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); + verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), + anyString()); + verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt()); + verify(mockLowLevelRequest, times(retries)).execute(); + verify(mockLowLevelResponse, times(retries)).getStatusCode(); + } + + /** + * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO exception handler + * is invoked. + */ + @Test + public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception { + // Counts the number of calls to execute the HTTP request. + final AtomicLong executeCount = new AtomicLong(); + + // 10 is a private internal constant in the Google API Client library. See + // com.google.api.client.http.HttpRequest#setNumberOfRetries + // TODO: update this test once the private internal constant is public. + final int defaultNumberOfRetries = 10; + + // A mock HTTP request that always throws SocketTimeoutException. + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + executeCount.incrementAndGet(); + throw new SocketTimeoutException("Fake forced timeout exception"); + } + }).build(); + + // A sample HTTP request to Google Cloud Storage that uses both default Transport and default + // RetryHttpInitializer. + Storage storage = new Storage.Builder( + transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build(); + + Get getRequest = storage.objects().get("gs://fake", "file"); + + try { + getRequest.execute(); + fail(); + } catch (Throwable e) { + assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class)); + assertEquals(1 + defaultNumberOfRetries, executeCount.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java new file mode 100644 index 0000000..426fb16 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.gcsfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests of GcsPath. + */ +@RunWith(JUnit4.class) +public class GcsPathTest { + + /** + * Test case, which tests parsing and building of GcsPaths. + */ + static final class TestCase { + + final String uri; + final String expectedBucket; + final String expectedObject; + final String[] namedComponents; + + TestCase(String uri, String... namedComponents) { + this.uri = uri; + this.expectedBucket = namedComponents[0]; + this.namedComponents = namedComponents; + this.expectedObject = uri.substring(expectedBucket.length() + 6); + } + } + + // Each test case is an expected URL, then the components used to build it. + // Empty components result in a double slash. + static final List<TestCase> PATH_TEST_CASES = Arrays.asList( + new TestCase("gs://bucket/then/object", "bucket", "then", "object"), + new TestCase("gs://bucket//then/object", "bucket", "", "then", "object"), + new TestCase("gs://bucket/then//object", "bucket", "then", "", "object"), + new TestCase("gs://bucket/then///object", "bucket", "then", "", "", "object"), + new TestCase("gs://bucket/then/object/", "bucket", "then", "object/"), + new TestCase("gs://bucket/then/object/", "bucket", "then/", "object/"), + new TestCase("gs://bucket/then/object//", "bucket", "then", "object", ""), + new TestCase("gs://bucket/then/object//", "bucket", "then", "object/", ""), + new TestCase("gs://bucket/", "bucket") + ); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testGcsPathParsing() throws Exception { + for (TestCase testCase : PATH_TEST_CASES) { + String uriString = testCase.uri; + + GcsPath path = GcsPath.fromUri(URI.create(uriString)); + // Deconstruction - check bucket, object, and components. + assertEquals(testCase.expectedBucket, path.getBucket()); + assertEquals(testCase.expectedObject, path.getObject()); + assertEquals(testCase.uri, + testCase.namedComponents.length, path.getNameCount()); + + // Construction - check that the path can be built from components. + GcsPath built = GcsPath.fromComponents(null, null); + for (String component : testCase.namedComponents) { + built = built.resolve(component); + } + assertEquals(testCase.uri, built.toString()); + } + } + + @Test + public void testParentRelationship() throws Exception { + GcsPath path = GcsPath.fromComponents("bucket", "then/object"); + assertEquals("bucket", path.getBucket()); + assertEquals("then/object", path.getObject()); + assertEquals(3, path.getNameCount()); + assertTrue(path.endsWith("object")); + assertTrue(path.startsWith("bucket/then")); + + GcsPath parent = path.getParent(); // gs://bucket/then/ + assertEquals("bucket", parent.getBucket()); + assertEquals("then/", parent.getObject()); + assertEquals(2, parent.getNameCount()); + assertThat(path, Matchers.not(Matchers.equalTo(parent))); + assertTrue(path.startsWith(parent)); + assertFalse(parent.startsWith(path)); + assertTrue(parent.endsWith("then/")); + assertTrue(parent.startsWith("bucket/then")); + assertTrue(parent.isAbsolute()); + + GcsPath root = path.getRoot(); + assertEquals(0, root.getNameCount()); + assertEquals("gs://", root.toString()); + assertEquals("", root.getBucket()); + assertEquals("", root.getObject()); + assertTrue(root.isAbsolute()); + assertThat(root, Matchers.equalTo(parent.getRoot())); + + GcsPath grandParent = parent.getParent(); // gs://bucket/ + assertEquals(1, grandParent.getNameCount()); + assertEquals("gs://bucket/", grandParent.toString()); + assertTrue(grandParent.isAbsolute()); + assertThat(root, Matchers.equalTo(grandParent.getParent())); + assertThat(root.getParent(), Matchers.nullValue()); + + assertTrue(path.startsWith(path.getRoot())); + assertTrue(parent.startsWith(path.getRoot())); + } + + @Test + public void testRelativeParent() throws Exception { + GcsPath path = GcsPath.fromComponents(null, "a/b"); + GcsPath parent = path.getParent(); + assertEquals("a/", parent.toString()); + + GcsPath grandParent = parent.getParent(); + assertNull(grandParent); + } + + @Test + public void testUriSupport() throws Exception { + URI uri = URI.create("gs://bucket/some/path"); + + GcsPath path = GcsPath.fromUri(uri); + assertEquals("bucket", path.getBucket()); + assertEquals("some/path", path.getObject()); + + URI reconstructed = path.toUri(); + assertEquals(uri, reconstructed); + + path = GcsPath.fromUri("gs://bucket"); + assertEquals("gs://bucket/", path.toString()); + } + + @Test + public void testBucketParsing() throws Exception { + GcsPath path = GcsPath.fromUri("gs://bucket"); + GcsPath path2 = GcsPath.fromUri("gs://bucket/"); + + assertEquals(path, path2); + assertEquals(path.toString(), path2.toString()); + assertEquals(path.toUri(), path2.toUri()); + } + + @Test + public void testGcsPathToString() throws Exception { + String filename = "gs://some_bucket/some/file.txt"; + GcsPath path = GcsPath.fromUri(filename); + assertEquals(filename, path.toString()); + } + + @Test + public void testEquals() { + GcsPath a = GcsPath.fromComponents(null, "a/b/c"); + GcsPath a2 = GcsPath.fromComponents(null, "a/b/c"); + assertFalse(a.isAbsolute()); + assertFalse(a2.isAbsolute()); + + GcsPath b = GcsPath.fromComponents("bucket", "a/b/c"); + GcsPath b2 = GcsPath.fromComponents("bucket", "a/b/c"); + assertTrue(b.isAbsolute()); + assertTrue(b2.isAbsolute()); + + assertEquals(a, a); + assertThat(a, Matchers.not(Matchers.equalTo(b))); + assertThat(b, Matchers.not(Matchers.equalTo(a))); + + assertEquals(a, a2); + assertEquals(a2, a); + assertEquals(b, b2); + assertEquals(b2, b); + + assertThat(a, Matchers.not(Matchers.equalTo(Paths.get("/tmp/foo")))); + assertTrue(a != null); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidGcsPath() { + @SuppressWarnings("unused") + GcsPath filename = + GcsPath.fromUri("file://invalid/gcs/path"); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidBucket() { + GcsPath.fromComponents("invalid/", ""); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidObject_newline() { + GcsPath.fromComponents(null, "a\nb"); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidObject_cr() { + GcsPath.fromComponents(null, "a\rb"); + } + + @Test + public void testResolveUri() { + GcsPath path = GcsPath.fromComponents("bucket", "a/b/c"); + GcsPath d = path.resolve("gs://bucket2/d"); + assertEquals("gs://bucket2/d", d.toString()); + } + + @Test + public void testResolveOther() { + GcsPath a = GcsPath.fromComponents("bucket", "a"); + GcsPath b = a.resolve(Paths.get("b")); + assertEquals("a/b", b.getObject()); + } + + @Test + public void testGetFileName() { + assertEquals("foo", GcsPath.fromUri("gs://bucket/bar/foo").getFileName().toString()); + assertEquals("foo", GcsPath.fromUri("gs://bucket/foo").getFileName().toString()); + thrown.expect(UnsupportedOperationException.class); + GcsPath.fromUri("gs://bucket/").getFileName(); + } + + @Test + public void testResolveSibling() { + assertEquals( + "gs://bucket/bar/moo", + GcsPath.fromUri("gs://bucket/bar/foo").resolveSibling("moo").toString()); + assertEquals( + "gs://bucket/moo", + GcsPath.fromUri("gs://bucket/foo").resolveSibling("moo").toString()); + thrown.expect(UnsupportedOperationException.class); + GcsPath.fromUri("gs://bucket/").resolveSibling("moo"); + } + + @Test + public void testCompareTo() { + GcsPath a = GcsPath.fromComponents("bucket", "a"); + GcsPath b = GcsPath.fromComponents("bucket", "b"); + GcsPath b2 = GcsPath.fromComponents("bucket2", "b"); + GcsPath brel = GcsPath.fromComponents(null, "b"); + GcsPath a2 = GcsPath.fromComponents("bucket", "a"); + GcsPath arel = GcsPath.fromComponents(null, "a"); + + assertThat(a.compareTo(b), Matchers.lessThan(0)); + assertThat(b.compareTo(a), Matchers.greaterThan(0)); + assertThat(a.compareTo(a2), Matchers.equalTo(0)); + + assertThat(a.hashCode(), Matchers.equalTo(a2.hashCode())); + assertThat(a.hashCode(), Matchers.not(Matchers.equalTo(b.hashCode()))); + assertThat(b.hashCode(), Matchers.not(Matchers.equalTo(brel.hashCode()))); + + assertThat(brel.compareTo(b), Matchers.lessThan(0)); + assertThat(b.compareTo(brel), Matchers.greaterThan(0)); + assertThat(arel.compareTo(brel), Matchers.lessThan(0)); + assertThat(brel.compareTo(arel), Matchers.greaterThan(0)); + + assertThat(b.compareTo(b2), Matchers.lessThan(0)); + assertThat(b2.compareTo(b), Matchers.greaterThan(0)); + } + + @Test + public void testCompareTo_ordering() { + GcsPath ab = GcsPath.fromComponents("bucket", "a/b"); + GcsPath abc = GcsPath.fromComponents("bucket", "a/b/c"); + GcsPath a1b = GcsPath.fromComponents("bucket", "a-1/b"); + + assertThat(ab.compareTo(a1b), Matchers.lessThan(0)); + assertThat(a1b.compareTo(ab), Matchers.greaterThan(0)); + + assertThat(ab.compareTo(abc), Matchers.lessThan(0)); + assertThat(abc.compareTo(ab), Matchers.greaterThan(0)); + } + + @Test + public void testCompareTo_buckets() { + GcsPath a = GcsPath.fromComponents(null, "a/b/c"); + GcsPath b = GcsPath.fromComponents("bucket", "a/b/c"); + + assertThat(a.compareTo(b), Matchers.lessThan(0)); + assertThat(b.compareTo(a), Matchers.greaterThan(0)); + } + + @Test + public void testIterator() { + GcsPath a = GcsPath.fromComponents("bucket", "a/b/c"); + Iterator<Path> it = a.iterator(); + + assertTrue(it.hasNext()); + assertEquals("gs://bucket/", it.next().toString()); + assertTrue(it.hasNext()); + assertEquals("a", it.next().toString()); + assertTrue(it.hasNext()); + assertEquals("b", it.next().toString()); + assertTrue(it.hasNext()); + assertEquals("c", it.next().toString()); + assertFalse(it.hasNext()); + } + + @Test + public void testSubpath() { + GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d"); + assertThat(a.subpath(0, 1).toString(), Matchers.equalTo("gs://bucket/")); + assertThat(a.subpath(0, 2).toString(), Matchers.equalTo("gs://bucket/a")); + assertThat(a.subpath(0, 3).toString(), Matchers.equalTo("gs://bucket/a/b")); + assertThat(a.subpath(0, 4).toString(), Matchers.equalTo("gs://bucket/a/b/c")); + assertThat(a.subpath(1, 2).toString(), Matchers.equalTo("a")); + assertThat(a.subpath(2, 3).toString(), Matchers.equalTo("b")); + assertThat(a.subpath(2, 4).toString(), Matchers.equalTo("b/c")); + assertThat(a.subpath(2, 5).toString(), Matchers.equalTo("b/c/d")); + } + + @Test + public void testGetName() { + GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d"); + assertEquals(5, a.getNameCount()); + assertThat(a.getName(0).toString(), Matchers.equalTo("gs://bucket/")); + assertThat(a.getName(1).toString(), Matchers.equalTo("a")); + assertThat(a.getName(2).toString(), Matchers.equalTo("b")); + assertThat(a.getName(3).toString(), Matchers.equalTo("c")); + assertThat(a.getName(4).toString(), Matchers.equalTo("d")); + } + + @Test(expected = IllegalArgumentException.class) + public void testSubPathError() { + GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d"); + a.subpath(1, 1); // throws IllegalArgumentException + Assert.fail(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index 8a48eca..4c60f20 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -32,7 +32,7 @@ <name>Apache Beam :: SDKs :: Java :: Extensions</name> <modules> - <module>gcp-core</module> + <module>google-cloud-platform-core</module> <module>jackson</module> <module>join-library</module> <module>protobuf</module>
