http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java deleted file mode 100644 index 67d5880..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptionsTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.gcp.options; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Delete; -import com.google.api.services.storage.Storage; -import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.extensions.gcp.options.GoogleApiDebugOptions.GoogleApiTracer; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.Transport; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link GoogleApiDebugOptions}. */ -@RunWith(JUnit4.class) -public class GoogleApiDebugOptionsTest { - private static final String STORAGE_GET_TRACE = - "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; - private static final String STORAGE_GET_AND_LIST_TRACE = - "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," - + "\"Objects.List\":\"ListTraceDestination\"}"; - private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; - - @Test - public void testWhenTracingMatches() throws Exception { - String[] args = new String[] {STORAGE_GET_TRACE}; - GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.Get request = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("GetTraceDestination", request.get("$trace")); - } - - @Test - public void testWhenTracingDoesNotMatch() throws Exception { - String[] args = new String[] {STORAGE_GET_TRACE}; - GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.List request = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertNull(request.get("$trace")); - } - - @Test - public void testWithMultipleTraces() throws Exception { - String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; - GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("GetTraceDestination", getRequest.get("$trace")); - - Storage.Objects.List listRequest = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertEquals("ListTraceDestination", listRequest.get("$trace")); - } - - @Test - public void testMatchingAllCalls() throws Exception { - String[] args = new String[] {STORAGE_TRACE}; - GcsOptions options = - PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - - assertNotNull(options.getGoogleApiTrace()); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("TraceDestination", getRequest.get("$trace")); - - Storage.Objects.List listRequest = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertEquals("TraceDestination", listRequest.get("$trace")); - } - - @Test - public void testMatchingAgainstClient() throws Exception { - GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newStorageClient(options).build(), "TraceDestination")); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("TraceDestination", getRequest.get("$trace")); - - Delete deleteRequest = GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient( - options.as(CloudResourceManagerOptions.class)) - .build().projects().delete("testProjectId"); - assertNull(deleteRequest.get("$trace")); - } - - @Test - public void testMatchingAgainstRequestType() throws Exception { - GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newStorageClient(options).build().objects() - .get("aProjectId", "aObjectId"), "TraceDestination")); - - Storage.Objects.Get getRequest = - Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); - assertEquals("TraceDestination", getRequest.get("$trace")); - - Storage.Objects.List listRequest = - Transport.newStorageClient(options).build().objects().list("testProjectId"); - assertNull(listRequest.get("$trace")); - } - - @Test - public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception { - String serializedValue = "{\"Api\":\"Token\"}"; - ObjectMapper objectMapper = new ObjectMapper(); - assertEquals(serializedValue, - objectMapper.writeValueAsString( - objectMapper.readValue(serializedValue, GoogleApiTracer.class))); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java deleted file mode 100644 index a29dd45..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.fail; - -import com.google.common.collect.Lists; -import java.util.ServiceLoader; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link GcsIOChannelFactoryRegistrar}. - */ -@RunWith(JUnit4.class) -public class GcsIOChannelFactoryRegistrarTest { - - @Test - public void testServiceLoader() { - for (IOChannelFactoryRegistrar registrar - : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { - if (registrar instanceof GcsIOChannelFactoryRegistrar) { - return; - } - } - fail("Expected to find " + GcsIOChannelFactoryRegistrar.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java deleted file mode 100644 index f53490a..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link GcsIOChannelFactoryTest}. */ -@RunWith(JUnit4.class) -public class GcsIOChannelFactoryTest { - private GcsIOChannelFactory factory; - - @Before - public void setUp() { - factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class)); - } - - @Test - public void testResolve() throws Exception { - assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object")); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java deleted file mode 100644 index 65fb228..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** Tests for {@link GcsPathValidator}. */ -@RunWith(JUnit4.class) -public class GcsPathValidatorTest { - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @Mock private GcsUtil mockGcsUtil; - private GcsPathValidator validator; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); - GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - options.setGcsUtil(mockGcsUtil); - validator = GcsPathValidator.fromOptions(options); - } - - @Test - public void testValidFilePattern() { - validator.validateInputFilePatternSupported("gs://bucket/path"); - } - - @Test - public void testInvalidFilePattern() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Expected a valid 'gs://' path but was given '/local/path'"); - validator.validateInputFilePatternSupported("/local/path"); - } - - @Test - public void testFilePatternMissingBucket() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Missing object or bucket in path: 'gs://input/', " - + "did you mean: 'gs://some-bucket/input'?"); - validator.validateInputFilePatternSupported("gs://input"); - } - - @Test - public void testWhenBucketDoesNotExist() throws Exception { - when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Could not find file gs://non-existent-bucket/location"); - validator.validateInputFilePatternSupported("gs://non-existent-bucket/location"); - } - - @Test - public void testValidOutputPrefix() { - validator.validateOutputFilePrefixSupported("gs://bucket/path"); - } - - @Test - public void testInvalidOutputPrefix() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Expected a valid 'gs://' path but was given '/local/path'"); - validator.validateOutputFilePrefixSupported("/local/path"); - } - - @Test - public void testOutputPrefixMissingBucket() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Missing object or bucket in path: 'gs://output/', " - + "did you mean: 'gs://some-bucket/output'?"); - validator.validateOutputFilePrefixSupported("gs://output"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java deleted file mode 100644 index 6ffcaeb..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ /dev/null @@ -1,799 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import com.google.api.client.googleapis.batch.BatchRequest; -import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.LowLevelHttpRequest; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.GenericJson; -import com.google.api.client.json.Json; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.HttpTesting; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.client.util.BackOff; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Bucket; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; -import com.google.cloud.hadoop.util.ClientRequestHelper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.ByteArrayInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigInteger; -import java.net.SocketTimeoutException; -import java.nio.channels.SeekableByteChannel; -import java.nio.charset.StandardCharsets; -import java.nio.file.AccessDeniedException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -/** Test case for {@link GcsUtil}. */ -@RunWith(JUnit4.class) -public class GcsUtilTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testGlobTranslation() { - assertEquals("foo", GcsUtil.globToRegexp("foo")); - assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o")); - assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?")); - assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*")); - } - - private static GcsOptions gcsOptionsWithTestCredential() { - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - pipelineOptions.setGcpCredential(new TestCredential()); - return pipelineOptions; - } - - @Test - public void testCreationWithDefaultOptions() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - assertNotNull(pipelineOptions.getGcpCredential()); - } - - @Test - public void testUploadBufferSizeDefault() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil util = pipelineOptions.getGcsUtil(); - assertNull(util.getUploadBufferSizeBytes()); - } - - @Test - public void testUploadBufferSizeUserSpecified() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - pipelineOptions.setGcsUploadBufferSizeBytes(12345); - GcsUtil util = pipelineOptions.getGcsUtil(); - assertEquals((Integer) 12345, util.getUploadBufferSizeBytes()); - } - - @Test - public void testCreationWithExecutorServiceProvided() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - pipelineOptions.setExecutorService(Executors.newCachedThreadPool()); - assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService); - } - - @Test - public void testCreationWithGcsUtilProvided() { - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - GcsUtil gcsUtil = Mockito.mock(GcsUtil.class); - pipelineOptions.setGcsUtil(gcsUtil); - assertSame(gcsUtil, pipelineOptions.getGcsUtil()); - } - - @Test - public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception { - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - ExecutorService executorService = pipelineOptions.getExecutorService(); - - int numThreads = 100; - final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads]; - for (int i = 0; i < numThreads; i++) { - final int currentLatch = i; - countDownLatches[i] = new CountDownLatch(1); - executorService.execute( - new Runnable() { - @Override - public void run() { - // Wait for latch N and then release latch N - 1 - try { - countDownLatches[currentLatch].await(); - if (currentLatch > 0) { - countDownLatches[currentLatch - 1].countDown(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - }); - } - - // Release the last latch starting the chain reaction. - countDownLatches[countDownLatches.length - 1].countDown(); - executorService.shutdown(); - assertTrue("Expected tasks to complete", - executorService.awaitTermination(10, TimeUnit.SECONDS)); - } - - @Test - public void testGlobExpansion() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class); - - Objects modelObjects = new Objects(); - List<StorageObject> items = new ArrayList<>(); - // A directory - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); - - // Files within the directory - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile")); - - modelObjects.setItems(items); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")).thenReturn( - mockStorageGet); - when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList); - when(mockStorageGet.execute()).thenReturn( - new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); - when(mockStorageList.execute()).thenReturn(modelObjects); - - // Test a single file. - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"); - List<GcsPath> expectedFiles = - ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - // Test patterns. - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - } - - // Patterns that contain recursive wildcards ('**') are not supported. - @Test - public void testRecursiveGlobExpansionFails() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**"); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unsupported wildcard usage"); - gcsUtil.expand(pattern); - } - - // GCSUtil.expand() should fail when matching a single object when that object does not exist. - // We should return the empty result since GCS get object is strongly consistent. - @Test - public void testNonExistentObjectReturnsEmptyResult() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile"); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, - "It don't exist", "Nothing here to see"); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( - mockStorageGet); - when(mockStorageGet.execute()).thenThrow(expectedException); - - assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern)); - } - - // GCSUtil.expand() should fail for other errors such as access denied. - @Test - public void testAccessDeniedObjectThrowsIOException() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile"); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets you're looking for"); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( - mockStorageGet); - when(mockStorageGet.execute()).thenThrow(expectedException); - - thrown.expect(IOException.class); - thrown.expectMessage("Unable to get the file object for path"); - gcsUtil.expand(pattern); - } - - @Test - public void testFileSizeNonBatch() throws Exception { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()).thenReturn( - new StorageObject().setSize(BigInteger.valueOf(1000))); - - assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"))); - } - - @Test - public void testFileSizeWhenFileNotFoundNonBatch() throws Exception { - MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse(); - notFoundResponse.setContent(""); - notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND); - - MockHttpTransport mockTransport = - new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); - - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); - - thrown.expect(FileNotFoundException.class); - gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")); - } - - @Test - public void testRetryFileSizeNonBatch() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff(); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000))); - - assertEquals(1000, - gcsUtil.getObject( - GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, - new FastNanoClockAndSleeper()).getSize().longValue()); - assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis()); - } - - @Test - public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { - JsonFactory jsonFactory = new JacksonFactory(); - - String contentBoundary = "batch_foobarbaz"; - String contentBoundaryLine = "--" + contentBoundary; - String endOfContentBoundaryLine = "--" + contentBoundary + "--"; - - GenericJson error = new GenericJson() - .set("error", new GenericJson().set("code", 404)); - error.setFactory(jsonFactory); - - String content = contentBoundaryLine + "\n" - + "Content-Type: application/http\n" - + "\n" - + "HTTP/1.1 404 Not Found\n" - + "Content-Length: -1\n" - + "\n" - + error.toString() - + "\n" - + "\n" - + endOfContentBoundaryLine - + "\n"; - thrown.expect(FileNotFoundException.class); - MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse() - .setContentType("multipart/mixed; boundary=" + contentBoundary) - .setContent(content) - .setStatusCode(HttpStatusCodes.STATUS_CODE_OK); - - MockHttpTransport mockTransport = - new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); - - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); - gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); - } - - @Test - public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception { - JsonFactory jsonFactory = new JacksonFactory(); - - String contentBoundary = "batch_foobarbaz"; - String contentBoundaryLine = "--" + contentBoundary; - String endOfContentBoundaryLine = "--" + contentBoundary + "--"; - - GenericJson error = new GenericJson() - .set("error", new GenericJson().set("code", 404)); - error.setFactory(jsonFactory); - - String content = contentBoundaryLine + "\n" - + "Content-Type: application/http\n" - + "\n" - + "HTTP/1.1 404 Not Found\n" - + "Content-Length: -1\n" - + "\n" - + error.toString() - + "\n" - + "\n" - + endOfContentBoundaryLine - + "\n"; - thrown.expect(FileNotFoundException.class); - - final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); - when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); - - // 429: Too many requests, then 200: OK. - when(mockResponse.getStatusCode()).thenReturn(429, 200); - when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content)); - - // A mock transport that lets us mock the API responses. - MockHttpTransport mockTransport = - new MockHttpTransport.Builder() - .setLowLevelHttpRequest( - new MockLowLevelHttpRequest() { - @Override - public LowLevelHttpResponse execute() throws IOException { - return mockResponse; - } - }) - .build(); - - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - gcsUtil.setStorageClient( - new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); - gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); - } - - @Test - public void testCreateBucket() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.insert( - any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); - when(mockStorageInsert.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new Bucket()); - - gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); - } - - @Test - public void testCreateBucketAccessErrors() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets you're looking for"); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.insert( - any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); - when(mockStorageInsert.execute()) - .thenThrow(expectedException); - - thrown.expect(AccessDeniedException.class); - - gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); - } - - @Test - public void testBucketAccessible() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new Bucket()); - - assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testBucketDoesNotExistBecauseOfAccessError() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets you're looking for"); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(expectedException); - - assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testBucketDoesNotExist() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, - "It don't exist", "Nothing here to see")); - - assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testGetBucket() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new Bucket()); - - assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testGetBucketNotExists() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, - "It don't exist", "Nothing here to see")); - - thrown.expect(FileNotFoundException.class); - thrown.expectMessage("It don't exist"); - gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper()); - } - - @Test - public void testGCSChannelCloseIdempotent() throws IOException { - SeekableByteChannel channel = - new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null, - new ClientRequestHelper<StorageObject>()); - channel.close(); - channel.close(); - } - - /** - * Builds a fake GoogleJsonResponseException for testing API error handling. - */ - private static GoogleJsonResponseException googleJsonResponseException( - final int status, final String reason, final String message) throws IOException { - final JsonFactory jsonFactory = new JacksonFactory(); - HttpTransport transport = new MockHttpTransport() { - @Override - public LowLevelHttpRequest buildRequest(String method, String url) throws IOException { - ErrorInfo errorInfo = new ErrorInfo(); - errorInfo.setReason(reason); - errorInfo.setMessage(message); - errorInfo.setFactory(jsonFactory); - GenericJson error = new GenericJson(); - error.set("code", status); - error.set("errors", Arrays.asList(errorInfo)); - error.setFactory(jsonFactory); - GenericJson errorResponse = new GenericJson(); - errorResponse.set("error", error); - errorResponse.setFactory(jsonFactory); - return new MockLowLevelHttpRequest().setResponse( - new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString()) - .setContentType(Json.MEDIA_TYPE).setStatusCode(status)); - } - }; - HttpRequest request = - transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); - request.setThrowExceptionOnExecuteError(false); - HttpResponse response = request.execute(); - return GoogleJsonResponseException.from(jsonFactory, response); - } - - private static List<String> makeStrings(String s, int n) { - ImmutableList.Builder<String> ret = ImmutableList.builder(); - for (int i = 0; i < n; ++i) { - ret.add(String.format("gs://bucket/%s%d", s, i)); - } - return ret.build(); - } - - private static List<GcsPath> makeGcsPaths(String s, int n) { - ImmutableList.Builder<GcsPath> ret = ImmutableList.builder(); - for (int i = 0; i < n; ++i) { - ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i))); - } - return ret.build(); - } - - private static int sumBatchSizes(List<BatchRequest> batches) { - int ret = 0; - for (BatchRequest b : batches) { - ret += b.size(); - assertThat(b.size(), greaterThan(0)); - } - return ret; - } - - @Test - public void testMakeCopyBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - // Small number of files fits in 1 batch - List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(3)); - - // 1 batch of files fits in 1 batch - batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(100)); - - // A little more than 5 batches of files fits in 6 batches - batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501)); - assertThat(batches.size(), equalTo(6)); - assertThat(sumBatchSizes(batches), equalTo(501)); - } - - @Test - public void testInvalidCopyBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Number of source files 3"); - - gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1)); - } - - @Test - public void testMakeRemoveBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - // Small number of files fits in 1 batch - List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(3)); - - // 1 batch of files fits in 1 batch - batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(100)); - - // A little more than 5 batches of files fits in 6 batches - batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501)); - assertThat(batches.size(), equalTo(6)); - assertThat(sumBatchSizes(batches), equalTo(501)); - } - - @Test - public void testMakeGetBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - // Small number of files fits in 1 batch - List<StorageObjectOrIOException[]> results = Lists.newArrayList(); - List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(3)); - assertEquals(3, results.size()); - - // 1 batch of files fits in 1 batch - results = Lists.newArrayList(); - batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(100)); - assertEquals(100, results.size()); - - // A little more than 5 batches of files fits in 6 batches - results = Lists.newArrayList(); - batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results); - assertThat(batches.size(), equalTo(6)); - assertThat(sumBatchSizes(batches), equalTo(501)); - assertEquals(501, results.size()); - } - - /** - * A helper to wrap a {@link GenericJson} object in a content stream. - */ - private static InputStream toStream(String content) throws IOException { - return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java deleted file mode 100644 index 37551a4..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpResponseException; -import com.google.api.client.http.HttpResponseInterceptor; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.LowLevelHttpRequest; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.util.NanoClock; -import com.google.api.client.util.Sleeper; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.Storage.Objects.Get; -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.security.PrivateKey; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicLong; -import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Tests for RetryHttpRequestInitializer. - */ -@RunWith(JUnit4.class) -public class RetryHttpRequestInitializerTest { - - @Mock private PrivateKey mockPrivateKey; - @Mock private LowLevelHttpRequest mockLowLevelRequest; - @Mock private LowLevelHttpResponse mockLowLevelResponse; - @Mock private HttpResponseInterceptor mockHttpResponseInterceptor; - - private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); - private Storage storage; - - // Used to test retrying a request more than the default 10 times. - static class MockNanoClock implements NanoClock { - private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543, - 12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420}; - private int i = 0; - - @Override - public long nanoTime() { - return timesMs[i++ / 2] * 1000000; - } - } - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - HttpTransport lowLevelTransport = new HttpTransport() { - @Override - protected LowLevelHttpRequest buildRequest(String method, String url) - throws IOException { - return mockLowLevelRequest; - } - }; - - // Retry initializer will pass through to credential, since we can have - // only a single HttpRequestInitializer, and we use multiple Credential - // types in the SDK, not all of which allow for retry configuration. - RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer( - new MockNanoClock(), new Sleeper() { - @Override - public void sleep(long millis) throws InterruptedException {} - }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor); - storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer) - .setApplicationName("test").build(); - } - - @After - public void tearDown() { - verifyNoMoreInteractions(mockPrivateKey); - verifyNoMoreInteractions(mockLowLevelRequest); - verifyNoMoreInteractions(mockHttpResponseInterceptor); - } - - @Test - public void testBasicOperation() throws IOException { - when(mockLowLevelRequest.execute()) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(200); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest).execute(); - verify(mockLowLevelResponse).getStatusCode(); - } - - /** - * Tests that a non-retriable error is not retried. - */ - @Test - public void testErrorCodeForbidden() throws IOException { - when(mockLowLevelRequest.execute()) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(403) // Non-retryable error. - .thenReturn(200); // Shouldn't happen. - - try { - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - } catch (HttpResponseException e) { - Assert.assertThat(e.getMessage(), Matchers.containsString("403")); - } - - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest).execute(); - verify(mockLowLevelResponse).getStatusCode(); - } - - /** - * Tests that a retriable error is retried. - */ - @Test - public void testRetryableError() throws IOException { - when(mockLowLevelRequest.execute()) - .thenReturn(mockLowLevelResponse) - .thenReturn(mockLowLevelResponse) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(503) // Retryable - .thenReturn(429) // We also retry on 429 Too Many Requests. - .thenReturn(200); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest, times(3)).execute(); - verify(mockLowLevelResponse, times(3)).getStatusCode(); - } - - /** - * Tests that an IOException is retried. - */ - @Test - public void testThrowIOException() throws IOException { - when(mockLowLevelRequest.execute()) - .thenThrow(new IOException("Fake Error")) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(200); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest, times(2)).execute(); - verify(mockLowLevelResponse).getStatusCode(); - } - - /** - * Tests that a retryable error is retried enough times. - */ - @Test - public void testRetryableErrorRetryEnoughTimes() throws IOException { - when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse); - final int retries = 10; - when(mockLowLevelResponse.getStatusCode()).thenAnswer(new Answer<Integer>(){ - int n = 0; - @Override - public Integer answer(InvocationOnMock invocation) { - return (n++ < retries - 1) ? 503 : 200; - }}); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), - anyString()); - verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest, times(retries)).execute(); - verify(mockLowLevelResponse, times(retries)).getStatusCode(); - } - - /** - * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO exception handler - * is invoked. - */ - @Test - public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception { - // Counts the number of calls to execute the HTTP request. - final AtomicLong executeCount = new AtomicLong(); - - // 10 is a private internal constant in the Google API Client library. See - // com.google.api.client.http.HttpRequest#setNumberOfRetries - // TODO: update this test once the private internal constant is public. - final int defaultNumberOfRetries = 10; - - // A mock HTTP request that always throws SocketTimeoutException. - MockHttpTransport transport = - new MockHttpTransport.Builder().setLowLevelHttpRequest(new MockLowLevelHttpRequest() { - @Override - public LowLevelHttpResponse execute() throws IOException { - executeCount.incrementAndGet(); - throw new SocketTimeoutException("Fake forced timeout exception"); - } - }).build(); - - // A sample HTTP request to Google Cloud Storage that uses both default Transport and default - // RetryHttpInitializer. - Storage storage = new Storage.Builder( - transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build(); - - Get getRequest = storage.objects().get("gs://fake", "file"); - - try { - getRequest.execute(); - fail(); - } catch (Throwable e) { - assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class)); - assertEquals(1 + defaultNumberOfRetries, executeCount.get()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java deleted file mode 100644 index 426fb16..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/gcsfs/GcsPathTest.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.gcsfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.net.URI; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests of GcsPath. - */ -@RunWith(JUnit4.class) -public class GcsPathTest { - - /** - * Test case, which tests parsing and building of GcsPaths. - */ - static final class TestCase { - - final String uri; - final String expectedBucket; - final String expectedObject; - final String[] namedComponents; - - TestCase(String uri, String... namedComponents) { - this.uri = uri; - this.expectedBucket = namedComponents[0]; - this.namedComponents = namedComponents; - this.expectedObject = uri.substring(expectedBucket.length() + 6); - } - } - - // Each test case is an expected URL, then the components used to build it. - // Empty components result in a double slash. - static final List<TestCase> PATH_TEST_CASES = Arrays.asList( - new TestCase("gs://bucket/then/object", "bucket", "then", "object"), - new TestCase("gs://bucket//then/object", "bucket", "", "then", "object"), - new TestCase("gs://bucket/then//object", "bucket", "then", "", "object"), - new TestCase("gs://bucket/then///object", "bucket", "then", "", "", "object"), - new TestCase("gs://bucket/then/object/", "bucket", "then", "object/"), - new TestCase("gs://bucket/then/object/", "bucket", "then/", "object/"), - new TestCase("gs://bucket/then/object//", "bucket", "then", "object", ""), - new TestCase("gs://bucket/then/object//", "bucket", "then", "object/", ""), - new TestCase("gs://bucket/", "bucket") - ); - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testGcsPathParsing() throws Exception { - for (TestCase testCase : PATH_TEST_CASES) { - String uriString = testCase.uri; - - GcsPath path = GcsPath.fromUri(URI.create(uriString)); - // Deconstruction - check bucket, object, and components. - assertEquals(testCase.expectedBucket, path.getBucket()); - assertEquals(testCase.expectedObject, path.getObject()); - assertEquals(testCase.uri, - testCase.namedComponents.length, path.getNameCount()); - - // Construction - check that the path can be built from components. - GcsPath built = GcsPath.fromComponents(null, null); - for (String component : testCase.namedComponents) { - built = built.resolve(component); - } - assertEquals(testCase.uri, built.toString()); - } - } - - @Test - public void testParentRelationship() throws Exception { - GcsPath path = GcsPath.fromComponents("bucket", "then/object"); - assertEquals("bucket", path.getBucket()); - assertEquals("then/object", path.getObject()); - assertEquals(3, path.getNameCount()); - assertTrue(path.endsWith("object")); - assertTrue(path.startsWith("bucket/then")); - - GcsPath parent = path.getParent(); // gs://bucket/then/ - assertEquals("bucket", parent.getBucket()); - assertEquals("then/", parent.getObject()); - assertEquals(2, parent.getNameCount()); - assertThat(path, Matchers.not(Matchers.equalTo(parent))); - assertTrue(path.startsWith(parent)); - assertFalse(parent.startsWith(path)); - assertTrue(parent.endsWith("then/")); - assertTrue(parent.startsWith("bucket/then")); - assertTrue(parent.isAbsolute()); - - GcsPath root = path.getRoot(); - assertEquals(0, root.getNameCount()); - assertEquals("gs://", root.toString()); - assertEquals("", root.getBucket()); - assertEquals("", root.getObject()); - assertTrue(root.isAbsolute()); - assertThat(root, Matchers.equalTo(parent.getRoot())); - - GcsPath grandParent = parent.getParent(); // gs://bucket/ - assertEquals(1, grandParent.getNameCount()); - assertEquals("gs://bucket/", grandParent.toString()); - assertTrue(grandParent.isAbsolute()); - assertThat(root, Matchers.equalTo(grandParent.getParent())); - assertThat(root.getParent(), Matchers.nullValue()); - - assertTrue(path.startsWith(path.getRoot())); - assertTrue(parent.startsWith(path.getRoot())); - } - - @Test - public void testRelativeParent() throws Exception { - GcsPath path = GcsPath.fromComponents(null, "a/b"); - GcsPath parent = path.getParent(); - assertEquals("a/", parent.toString()); - - GcsPath grandParent = parent.getParent(); - assertNull(grandParent); - } - - @Test - public void testUriSupport() throws Exception { - URI uri = URI.create("gs://bucket/some/path"); - - GcsPath path = GcsPath.fromUri(uri); - assertEquals("bucket", path.getBucket()); - assertEquals("some/path", path.getObject()); - - URI reconstructed = path.toUri(); - assertEquals(uri, reconstructed); - - path = GcsPath.fromUri("gs://bucket"); - assertEquals("gs://bucket/", path.toString()); - } - - @Test - public void testBucketParsing() throws Exception { - GcsPath path = GcsPath.fromUri("gs://bucket"); - GcsPath path2 = GcsPath.fromUri("gs://bucket/"); - - assertEquals(path, path2); - assertEquals(path.toString(), path2.toString()); - assertEquals(path.toUri(), path2.toUri()); - } - - @Test - public void testGcsPathToString() throws Exception { - String filename = "gs://some_bucket/some/file.txt"; - GcsPath path = GcsPath.fromUri(filename); - assertEquals(filename, path.toString()); - } - - @Test - public void testEquals() { - GcsPath a = GcsPath.fromComponents(null, "a/b/c"); - GcsPath a2 = GcsPath.fromComponents(null, "a/b/c"); - assertFalse(a.isAbsolute()); - assertFalse(a2.isAbsolute()); - - GcsPath b = GcsPath.fromComponents("bucket", "a/b/c"); - GcsPath b2 = GcsPath.fromComponents("bucket", "a/b/c"); - assertTrue(b.isAbsolute()); - assertTrue(b2.isAbsolute()); - - assertEquals(a, a); - assertThat(a, Matchers.not(Matchers.equalTo(b))); - assertThat(b, Matchers.not(Matchers.equalTo(a))); - - assertEquals(a, a2); - assertEquals(a2, a); - assertEquals(b, b2); - assertEquals(b2, b); - - assertThat(a, Matchers.not(Matchers.equalTo(Paths.get("/tmp/foo")))); - assertTrue(a != null); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidGcsPath() { - @SuppressWarnings("unused") - GcsPath filename = - GcsPath.fromUri("file://invalid/gcs/path"); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidBucket() { - GcsPath.fromComponents("invalid/", ""); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidObject_newline() { - GcsPath.fromComponents(null, "a\nb"); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidObject_cr() { - GcsPath.fromComponents(null, "a\rb"); - } - - @Test - public void testResolveUri() { - GcsPath path = GcsPath.fromComponents("bucket", "a/b/c"); - GcsPath d = path.resolve("gs://bucket2/d"); - assertEquals("gs://bucket2/d", d.toString()); - } - - @Test - public void testResolveOther() { - GcsPath a = GcsPath.fromComponents("bucket", "a"); - GcsPath b = a.resolve(Paths.get("b")); - assertEquals("a/b", b.getObject()); - } - - @Test - public void testGetFileName() { - assertEquals("foo", GcsPath.fromUri("gs://bucket/bar/foo").getFileName().toString()); - assertEquals("foo", GcsPath.fromUri("gs://bucket/foo").getFileName().toString()); - thrown.expect(UnsupportedOperationException.class); - GcsPath.fromUri("gs://bucket/").getFileName(); - } - - @Test - public void testResolveSibling() { - assertEquals( - "gs://bucket/bar/moo", - GcsPath.fromUri("gs://bucket/bar/foo").resolveSibling("moo").toString()); - assertEquals( - "gs://bucket/moo", - GcsPath.fromUri("gs://bucket/foo").resolveSibling("moo").toString()); - thrown.expect(UnsupportedOperationException.class); - GcsPath.fromUri("gs://bucket/").resolveSibling("moo"); - } - - @Test - public void testCompareTo() { - GcsPath a = GcsPath.fromComponents("bucket", "a"); - GcsPath b = GcsPath.fromComponents("bucket", "b"); - GcsPath b2 = GcsPath.fromComponents("bucket2", "b"); - GcsPath brel = GcsPath.fromComponents(null, "b"); - GcsPath a2 = GcsPath.fromComponents("bucket", "a"); - GcsPath arel = GcsPath.fromComponents(null, "a"); - - assertThat(a.compareTo(b), Matchers.lessThan(0)); - assertThat(b.compareTo(a), Matchers.greaterThan(0)); - assertThat(a.compareTo(a2), Matchers.equalTo(0)); - - assertThat(a.hashCode(), Matchers.equalTo(a2.hashCode())); - assertThat(a.hashCode(), Matchers.not(Matchers.equalTo(b.hashCode()))); - assertThat(b.hashCode(), Matchers.not(Matchers.equalTo(brel.hashCode()))); - - assertThat(brel.compareTo(b), Matchers.lessThan(0)); - assertThat(b.compareTo(brel), Matchers.greaterThan(0)); - assertThat(arel.compareTo(brel), Matchers.lessThan(0)); - assertThat(brel.compareTo(arel), Matchers.greaterThan(0)); - - assertThat(b.compareTo(b2), Matchers.lessThan(0)); - assertThat(b2.compareTo(b), Matchers.greaterThan(0)); - } - - @Test - public void testCompareTo_ordering() { - GcsPath ab = GcsPath.fromComponents("bucket", "a/b"); - GcsPath abc = GcsPath.fromComponents("bucket", "a/b/c"); - GcsPath a1b = GcsPath.fromComponents("bucket", "a-1/b"); - - assertThat(ab.compareTo(a1b), Matchers.lessThan(0)); - assertThat(a1b.compareTo(ab), Matchers.greaterThan(0)); - - assertThat(ab.compareTo(abc), Matchers.lessThan(0)); - assertThat(abc.compareTo(ab), Matchers.greaterThan(0)); - } - - @Test - public void testCompareTo_buckets() { - GcsPath a = GcsPath.fromComponents(null, "a/b/c"); - GcsPath b = GcsPath.fromComponents("bucket", "a/b/c"); - - assertThat(a.compareTo(b), Matchers.lessThan(0)); - assertThat(b.compareTo(a), Matchers.greaterThan(0)); - } - - @Test - public void testIterator() { - GcsPath a = GcsPath.fromComponents("bucket", "a/b/c"); - Iterator<Path> it = a.iterator(); - - assertTrue(it.hasNext()); - assertEquals("gs://bucket/", it.next().toString()); - assertTrue(it.hasNext()); - assertEquals("a", it.next().toString()); - assertTrue(it.hasNext()); - assertEquals("b", it.next().toString()); - assertTrue(it.hasNext()); - assertEquals("c", it.next().toString()); - assertFalse(it.hasNext()); - } - - @Test - public void testSubpath() { - GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d"); - assertThat(a.subpath(0, 1).toString(), Matchers.equalTo("gs://bucket/")); - assertThat(a.subpath(0, 2).toString(), Matchers.equalTo("gs://bucket/a")); - assertThat(a.subpath(0, 3).toString(), Matchers.equalTo("gs://bucket/a/b")); - assertThat(a.subpath(0, 4).toString(), Matchers.equalTo("gs://bucket/a/b/c")); - assertThat(a.subpath(1, 2).toString(), Matchers.equalTo("a")); - assertThat(a.subpath(2, 3).toString(), Matchers.equalTo("b")); - assertThat(a.subpath(2, 4).toString(), Matchers.equalTo("b/c")); - assertThat(a.subpath(2, 5).toString(), Matchers.equalTo("b/c/d")); - } - - @Test - public void testGetName() { - GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d"); - assertEquals(5, a.getNameCount()); - assertThat(a.getName(0).toString(), Matchers.equalTo("gs://bucket/")); - assertThat(a.getName(1).toString(), Matchers.equalTo("a")); - assertThat(a.getName(2).toString(), Matchers.equalTo("b")); - assertThat(a.getName(3).toString(), Matchers.equalTo("c")); - assertThat(a.getName(4).toString(), Matchers.equalTo("d")); - } - - @Test(expected = IllegalArgumentException.class) - public void testSubPathError() { - GcsPath a = GcsPath.fromComponents("bucket", "a/b/c/d"); - a.subpath(1, 1); // throws IllegalArgumentException - Assert.fail(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml new file mode 100644 index 0000000..b1101ae --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml @@ -0,0 +1,185 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> + <name>Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core</name> + <description>Common components used to support multiple + Google Cloud Platform specific maven modules.</description> + + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludedGroups> + org.apache.beam.sdk.testing.NeedsRunner + </excludedGroups> + <systemPropertyVariables> + <beamUseDummyRunner>true</beamUseDummyRunner> + </systemPropertyVariables> + </configuration> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client-jackson2</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-oauth2-http</artifactId> + </dependency> + + <dependency> + <groupId>com.google.api-client</groupId> + <artifactId>google-api-client</artifactId> + </dependency> + + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>gcsio</artifactId> + </dependency> + + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>util</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-cloudresourcemanager</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-storage</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-credentials</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <!-- build dependencies --> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java new file mode 100644 index 0000000..6ab7b14 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.auth.Credentials; +import java.io.IOException; +import java.security.GeneralSecurityException; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + */ +public interface CredentialFactory { + Credentials getCredential() throws IOException, GeneralSecurityException; +} http://git-wip-us.apache.org/repos/asf/beam/blob/06f5a494/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java new file mode 100644 index 0000000..f999c63 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + * Returns a GCP credential. + */ +public class GcpCredentialFactory implements CredentialFactory { + /** + * The scope cloud-platform provides access to all Cloud Platform resources. + * cloud-platform isn't sufficient yet for talking to datastore so we request + * those resources separately. + * + * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for + * services we access directly (GCS) as opposed to through the backend + * (BigQuery, GCE), we need to explicitly request that scope. + */ + private static final List<String> SCOPES = Arrays.asList( + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/devstorage.full_control", + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/datastore", + "https://www.googleapis.com/auth/pubsub"); + + private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory(); + + public static GcpCredentialFactory fromOptions(PipelineOptions options) { + return INSTANCE; + } + + /** + * Returns a default GCP {@link Credentials} or null when it fails. + */ + @Override + public Credentials getCredential() { + try { + return GoogleCredentials.getApplicationDefault().createScoped(SCOPES); + } catch (IOException e) { + // Ignore the exception + // Pipelines that only access to public data should be able to run without credentials. + return null; + } + } +}
