http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java new file mode 100644 index 0000000..6fac6dc --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.util.BackOff; + + +/** + * Implementation of {@link BackOff} that increases the back off period for each retry attempt + * using a randomization function that grows exponentially. + * + * <p>Example: The initial interval is .5 seconds and the maximum interval is 60 secs. + * For 14 tries the sequence will be (values in seconds): + * + * <pre> + * retry# retry_interval randomized_interval + * 1 0.5 [0.25, 0.75] + * 2 0.75 [0.375, 1.125] + * 3 1.125 [0.562, 1.687] + * 4 1.687 [0.8435, 2.53] + * 5 2.53 [1.265, 3.795] + * 6 3.795 [1.897, 5.692] + * 7 5.692 [2.846, 8.538] + * 8 8.538 [4.269, 12.807] + * 9 12.807 [6.403, 19.210] + * 10 28.832 [14.416, 43.248] + * 11 43.248 [21.624, 64.873] + * 12 60.0 [30.0, 90.0] + * 13 60.0 [30.0, 90.0] + * 14 60.0 [30.0, 90.0] + * </pre> + * + * <p>Implementation is not thread-safe. + */ +@Deprecated +public class IntervalBoundedExponentialBackOff implements BackOff { + public static final double DEFAULT_MULTIPLIER = 1.5; + public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; + private final long maximumIntervalMillis; + private final long initialIntervalMillis; + private int currentAttempt; + + public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) { + checkArgument(maximumIntervalMillis > 0, "Maximum interval must be greater than zero."); + checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero."); + this.maximumIntervalMillis = maximumIntervalMillis; + this.initialIntervalMillis = initialIntervalMillis; + reset(); + } + + @Override + public void reset() { + currentAttempt = 1; + } + + @Override + public long nextBackOffMillis() { + double currentIntervalMillis = + Math.min( + initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1), + maximumIntervalMillis); + double randomOffset = + (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis; + currentAttempt += 1; + return Math.round(currentIntervalMillis + randomOffset); + } + + public boolean atMaxInterval() { + return initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1) + >= maximumIntervalMillis; + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java new file mode 100644 index 0000000..f703e4c --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auth.Credentials; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + * Always returns a null Credential object. + */ +public class NoopCredentialFactory implements CredentialFactory { + private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory(); + private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials(); + + public static NoopCredentialFactory fromOptions(PipelineOptions options) { + return INSTANCE; + } + + @Override + public Credentials getCredential() throws IOException { + return NOOP_CREDENTIALS; + } + + private static class NoopCredentials extends Credentials { + @Override + public String getAuthenticationType() { + return null; + } + + @Override + public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { + return null; + } + + @Override + public boolean hasRequestMetadata() { + return false; + } + + @Override + public boolean hasRequestMetadataOnly() { + return false; + } + + @Override + public void refresh() throws IOException {} + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java new file mode 100644 index 0000000..4ed35c6 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java @@ -0,0 +1,62 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.beam.sdk.util; + +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpUnsuccessfulResponseHandler; +import java.io.IOException; + +/** + * A {@link HttpRequestInitializer} for requests that don't have credentials. + * + * <p>When the access is denied, it throws {@link IOException} with a detailed error message. + */ +public class NullCredentialInitializer implements HttpRequestInitializer { + private static final int ACCESS_DENIED = 401; + private static final String NULL_CREDENTIAL_REASON = + "Unable to get application default credentials. Please see " + + "https://developers.google.com/accounts/docs/application-default-credentials " + + "for details on how to specify credentials. This version of the SDK is " + + "dependent on the gcloud core component version 2015.02.05 or newer to " + + "be able to get credentials from the currently authorized user via gcloud auth."; + + @Override + public void initialize(HttpRequest httpRequest) throws IOException { + httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler()); + } + + private static class NullCredentialHttpUnsuccessfulResponseHandler + implements HttpUnsuccessfulResponseHandler { + + @Override + public boolean handleResponse( + HttpRequest httpRequest, + HttpResponse httpResponse, boolean supportsRetry) throws IOException { + if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) { + throwNullCredentialException(); + } + return supportsRetry; + } + } + + public static void throwNullCredentialException() { + throw new RuntimeException(NULL_CREDENTIAL_REASON); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java new file mode 100644 index 0000000..80c093b --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.storage.Storage; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.GeneralSecurityException; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PubsubOptions; + +/** + * Helpers for cloud communication. + */ +public class Transport { + + private static class SingletonHelper { + /** Global instance of the JSON factory. */ + private static final JsonFactory JSON_FACTORY; + + /** Global instance of the HTTP transport. */ + private static final HttpTransport HTTP_TRANSPORT; + + static { + try { + JSON_FACTORY = JacksonFactory.getDefaultInstance(); + HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport(); + } catch (GeneralSecurityException | IOException e) { + throw new RuntimeException(e); + } + } + } + + public static HttpTransport getTransport() { + return SingletonHelper.HTTP_TRANSPORT; + } + + public static JsonFactory getJsonFactory() { + return SingletonHelper.JSON_FACTORY; + } + + private static class ApiComponents { + public String rootUrl; + public String servicePath; + + public ApiComponents(String root, String path) { + this.rootUrl = root; + this.servicePath = path; + } + } + + private static ApiComponents apiComponentsFromUrl(String urlString) { + try { + URL url = new URL(urlString); + String rootUrl = url.getProtocol() + "://" + url.getHost() + + (url.getPort() > 0 ? ":" + url.getPort() : ""); + return new ApiComponents(rootUrl, url.getPath()); + } catch (MalformedURLException e) { + throw new RuntimeException("Invalid URL: " + urlString); + } + } + + /** + * Returns a BigQuery client builder using the specified {@link BigQueryOptions}. + */ + public static Bigquery.Builder + newBigQueryClient(BigQueryOptions options) { + return new Bigquery.Builder(getTransport(), getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** + * Returns a Pubsub client builder using the specified {@link PubsubOptions}. + * + * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory + */ + @Deprecated + public static Pubsub.Builder + newPubsubClient(PubsubOptions options) { + return new Pubsub.Builder(getTransport(), getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setRootUrl(options.getPubsubRootUrl()) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** + * Returns a CloudResourceManager client builder using the specified + * {@link CloudResourceManagerOptions}. + */ + public static CloudResourceManager.Builder + newCloudResourceManagerClient(CloudResourceManagerOptions options) { + Credentials credentials = options.getGcpCredential(); + if (credentials == null) { + NullCredentialInitializer.throwNullCredentialException(); + } + return new CloudResourceManager.Builder(getTransport(), getJsonFactory(), + chainHttpRequestInitializer( + credentials, + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** + * Returns a Cloud Storage client builder using the specified {@link GcsOptions}. + */ + public static Storage.Builder + newStorageClient(GcsOptions options) { + String servicePath = options.getGcsEndpoint(); + Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log the code 404. Code up the stack will deal with 404's if needed, and + // logging it by default clutters the output during file staging. + new RetryHttpRequestInitializer( + ImmutableList.of(404), new UploadIdResponseInterceptor()))) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + if (servicePath != null) { + ApiComponents components = apiComponentsFromUrl(servicePath); + storageBuilder.setRootUrl(components.rootUrl); + storageBuilder.setServicePath(components.servicePath); + } + return storageBuilder; + } + + private static HttpRequestInitializer chainHttpRequestInitializer( + Credentials credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return new ChainingHttpRequestInitializer( + new NullCredentialInitializer(), httpRequestInitializer); + } else { + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java new file mode 100644 index 0000000..f8135e7 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */ +package org.apache.beam.sdk.util; http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java new file mode 100644 index 0000000..37fb42d --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam; + +import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.beam.sdk.util.ApiSurface; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** API surface verification for Google Cloud Platform core components. */ +@RunWith(JUnit4.class) +public class GcpCoreApiSurfaceTest { + + @Test + public void testApiSurface() throws Exception { + + @SuppressWarnings("unchecked") + final Set<String> allowed = + ImmutableSet.of( + "org.apache.beam", + "com.google.api.client", + "com.google.api.services.bigquery", + "com.google.api.services.cloudresourcemanager", + "com.google.api.services.pubsub", + "com.google.api.services.storage", + "com.google.auth", + "com.google.protobuf", + "com.fasterxml.jackson.annotation", + "com.fasterxml.jackson.core", + "com.fasterxml.jackson.databind", + "org.apache.avro", + "org.hamcrest", + // via DataflowMatchers + "org.codehaus.jackson", + // via Avro + "org.joda.time", + "org.junit"); + + assertThat( + ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java new file mode 100644 index 0000000..288383e --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.options.GcpOptions.DefaultProjectFactory; +import org.apache.beam.sdk.testing.RestoreSystemProperties; +import org.apache.beam.sdk.util.NoopPathValidator; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GcpOptions}. */ +@RunWith(JUnit4.class) +public class GcpOptionsTest { + @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testGetProjectFromCloudSdkConfigEnv() throws Exception { + Map<String, String> environment = + ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", + runGetProjectTest(tmpFolder.newFile("properties"), environment)); + } + + @Test + public void testGetProjectFromAppDataEnv() throws Exception { + Map<String, String> environment = + ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath()); + System.setProperty("os.name", "windows"); + assertEquals("test-project", + runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"), + environment)); + } + + @Test + public void testGetProjectFromUserHomeEnvOld() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", + runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), + environment)); + } + + @Test + public void testGetProjectFromUserHomeEnv() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", + runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), + environment)); + } + + @Test + public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + makePropertiesFileWithProject(new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), + "old-project"); + assertEquals("test-project", + runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), + environment)); + } + + @Test + public void testUnableToGetDefaultProject() throws Exception { + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); + when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of()); + assertNull(projectFactory.create(PipelineOptionsFactory.create())); + } + + @Test + public void testEmptyGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setProject(""); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("--project is a required option"); + options.getGcpTempLocation(); + } + + @Test + public void testDefaultGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + String tempLocation = "gs://bucket"; + options.setTempLocation(tempLocation); + options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class); + assertEquals(tempLocation, options.getGcpTempLocation()); + } + + @Test + public void testDefaultGcpTempLocationInvalid() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setTempLocation("file://"); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path"); + options.getGcpTempLocation(); + } + + @Test + public void testDefaultGcpTempLocationDoesNotExist() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + String tempLocation = "gs://does/not/exist"; + options.setTempLocation(tempLocation); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path"); + thrown.expectCause( + hasMessage(containsString("Output path does not exist or is not writeable"))); + + options.getGcpTempLocation(); + } + + private static void makePropertiesFileWithProject(File path, String projectId) + throws IOException { + String properties = String.format("[core]%n" + + "account = [email protected]%n" + + "project = %s%n" + + "%n" + + "[dataflow]%n" + + "magic = true%n", projectId); + Files.write(properties, path, StandardCharsets.UTF_8); + } + + private static String runGetProjectTest(File path, Map<String, String> environment) + throws Exception { + makePropertiesFileWithProject(path, "test-project"); + DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); + when(projectFactory.getEnvironment()).thenReturn(environment); + return projectFactory.create(PipelineOptionsFactory.create()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java new file mode 100644 index 0000000..dae7208 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.bigquery.Bigquery.Datasets.Delete; +import com.google.api.services.storage.Storage; +import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; +import org.apache.beam.sdk.util.TestCredential; +import org.apache.beam.sdk.util.Transport; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GoogleApiDebugOptions}. */ +@RunWith(JUnit4.class) +public class GoogleApiDebugOptionsTest { + private static final String STORAGE_GET_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; + private static final String STORAGE_GET_AND_LIST_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," + + "\"Objects.List\":\"ListTraceDestination\"}"; + private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; + + @Test + public void testWhenTracingMatches() throws Exception { + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.Get request = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("GetTraceDestination", request.get("$trace")); + } + + @Test + public void testWhenTracingDoesNotMatch() throws Exception { + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.List request = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(request.get("$trace")); + } + + @Test + public void testWithMultipleTraces() throws Exception { + String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("GetTraceDestination", getRequest.get("$trace")); + + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("ListTraceDestination", listRequest.get("$trace")); + } + + @Test + public void testMatchingAllCalls() throws Exception { + String[] args = new String[] {STORAGE_TRACE}; + GcsOptions options = + PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + + assertNotNull(options.getGoogleApiTrace()); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("TraceDestination", getRequest.get("$trace")); + + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("TraceDestination", listRequest.get("$trace")); + } + + @Test + public void testMatchingAgainstClient() throws Exception { + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( + Transport.newStorageClient(options).build(), "TraceDestination")); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("TraceDestination", getRequest.get("$trace")); + + Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class)) + .build().datasets().delete("testProjectId", "testDatasetId"); + assertNull(deleteRequest.get("$trace")); + } + + @Test + public void testMatchingAgainstRequestType() throws Exception { + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( + Transport.newStorageClient(options).build().objects() + .get("aProjectId", "aObjectId"), "TraceDestination")); + + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); + assertEquals("TraceDestination", getRequest.get("$trace")); + + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(listRequest.get("$trace")); + } + + @Test + public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception { + String serializedValue = "{\"Api\":\"Token\"}"; + ObjectMapper objectMapper = new ObjectMapper(); + assertEquals(serializedValue, + objectMapper.writeValueAsString( + objectMapper.readValue(serializedValue, GoogleApiTracer.class))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java new file mode 100644 index 0000000..3b35856 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.QueryRequest; +import com.google.api.services.bigquery.model.QueryResponse; +import com.google.api.services.bigquery.model.TableCell; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.math.BigInteger; +import org.apache.beam.sdk.PipelineResult; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link BigqueryMatcher}. + */ +@RunWith(JUnit4.class) +public class BigqueryMatcherTest { + private final String appName = "test-app"; + private final String projectId = "test-project"; + private final String query = "test-query"; + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); + @Mock private Bigquery mockBigqueryClient; + @Mock private Bigquery.Jobs mockJobs; + @Mock private Bigquery.Jobs.Query mockQuery; + @Mock private PipelineResult mockResult; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(mockBigqueryClient.jobs()).thenReturn(mockJobs); + when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery); + } + + @Test + public void testBigqueryMatcherThatSucceeds() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher( + appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); + + assertThat(mockResult, matcher); + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + + @Test + public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "incorrect-checksum")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(createResponseContainingTestData()); + + thrown.expect(AssertionError.class); + thrown.expectMessage("Total number of rows are: 1"); + thrown.expectMessage("abc"); + try { + assertThat(mockResult, matcher); + } finally { + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + } + + @Test + public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString()); + when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false)); + + thrown.expect(AssertionError.class); + thrown.expectMessage("The query job hasn't completed."); + thrown.expectMessage("jobComplete=false"); + try { + assertThat(mockResult, matcher); + } finally { + verify(matcher).newBigqueryClient(eq(appName)); + verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query))); + } + } + + @Test + public void testQueryWithRetriesWhenServiceFails() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + when(mockQuery.execute()).thenThrow(new IOException()); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to get BigQuery response after retrying"); + try { + matcher.queryWithRetries( + mockBigqueryClient, + new QueryRequest(), + fastClock, + BigqueryMatcher.BACKOFF_FACTORY.backoff()); + } finally { + verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) + .query(eq(projectId), eq(new QueryRequest())); + } + } + + @Test + public void testQueryWithRetriesWhenQueryResponseNull() throws Exception { + BigqueryMatcher matcher = spy( + new BigqueryMatcher(appName, projectId, query, "some-checksum")); + when(mockQuery.execute()).thenReturn(null); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to get BigQuery response after retrying"); + try { + matcher.queryWithRetries( + mockBigqueryClient, + new QueryRequest(), + fastClock, + BigqueryMatcher.BACKOFF_FACTORY.backoff()); + } finally { + verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) + .query(eq(projectId), eq(new QueryRequest())); + } + } + + private QueryResponse createResponseContainingTestData() { + TableCell field1 = new TableCell(); + field1.setV("abc"); + TableCell field2 = new TableCell(); + field2.setV("2"); + TableCell field3 = new TableCell(); + field3.setV("testing BigQuery matcher."); + TableRow row = new TableRow(); + row.setF(Lists.newArrayList(field1, field2, field3)); + + QueryResponse response = new QueryResponse(); + response.setJobComplete(true); + response.setRows(Lists.newArrayList(row)); + response.setTotalRows(BigInteger.ONE); + return response; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java new file mode 100644 index 0000000..395e1f3 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +import com.google.api.services.storage.model.Bucket; +import java.io.IOException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.GcpOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockitoAnnotations; +import org.mockito.MockitoAnnotations.Mock; + +/** Tests for DefaultBucket. */ +@RunWith(JUnit4.class) +public class DefaultBucketTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private PipelineOptions options; + @Mock + private GcsUtil gcsUtil; + @Mock + private GcpProjectUtil gcpUtil; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + options = PipelineOptionsFactory.create(); + options.as(GcsOptions.class).setGcsUtil(gcsUtil); + options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil); + options.as(GcpOptions.class).setProject("foo"); + options.as(GcpOptions.class).setZone("us-north1-a"); + } + + @Test + public void testCreateBucket() { + String bucket = DefaultBucket.tryCreateDefaultBucket(options); + assertEquals("gs://dataflow-staging-us-north1-0", bucket); + } + + @Test + public void testCreateBucketProjectLookupFails() throws IOException { + when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness")); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to verify project"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void testCreateBucketCreateBucketFails() throws IOException { + doThrow(new IOException("badness")).when( + gcsUtil).createBucket(any(String.class), any(Bucket.class)); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable create default bucket"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void testCannotGetBucketOwner() throws IOException { + when(gcsUtil.bucketOwner(any(GcsPath.class))) + .thenThrow(new IOException("badness")); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to determine the owner"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void testProjectMismatch() throws IOException { + when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Bucket owner does not match the project"); + DefaultBucket.tryCreateDefaultBucket(options); + } + + @Test + public void regionFromZone() throws IOException { + assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a")); + assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java new file mode 100644 index 0000000..23f0418 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import com.google.api.client.util.BackOff; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.model.Project; +import java.net.SocketTimeoutException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Test case for {@link GcpProjectUtil}. */ +@RunWith(JUnit4.class) +public class GcpProjectUtilTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static CloudResourceManagerOptions crmOptionsWithTestCredential() { + CloudResourceManagerOptions pipelineOptions = + PipelineOptionsFactory.as(CloudResourceManagerOptions.class); + pipelineOptions.setGcpCredential(new TestCredential()); + return pipelineOptions; + } + + @Test + public void testGetProjectNumber() throws Exception { + CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential(); + GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil(); + + CloudResourceManager.Projects mockProjects = Mockito.mock( + CloudResourceManager.Projects.class); + CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class); + projectUtil.setCrmClient(mockCrm); + + CloudResourceManager.Projects.Get mockProjectsGet = + Mockito.mock(CloudResourceManager.Projects.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + Project project = new Project(); + project.setProjectNumber(5L); + + when(mockCrm.projects()).thenReturn(mockProjects); + when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet); + when(mockProjectsGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(project); + + assertEquals(5L, projectUtil.getProjectNumber( + "foo", mockBackOff, new FastNanoClockAndSleeper())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java new file mode 100644 index 0000000..a29dd45 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GcsIOChannelFactoryRegistrar}. + */ +@RunWith(JUnit4.class) +public class GcsIOChannelFactoryRegistrarTest { + + @Test + public void testServiceLoader() { + for (IOChannelFactoryRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { + if (registrar instanceof GcsIOChannelFactoryRegistrar) { + return; + } + } + fail("Expected to find " + GcsIOChannelFactoryRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java new file mode 100644 index 0000000..7248b38 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GcsIOChannelFactoryTest}. */ +@RunWith(JUnit4.class) +public class GcsIOChannelFactoryTest { + private GcsIOChannelFactory factory; + + @Before + public void setUp() { + factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class)); + } + + @Test + public void testResolve() throws Exception { + assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object")); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java new file mode 100644 index 0000000..dc36319 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link GcsPathValidator}. */ +@RunWith(JUnit4.class) +public class GcsPathValidatorTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Mock private GcsUtil mockGcsUtil; + private GcsPathValidator validator; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); + options.setGcpCredential(new TestCredential()); + options.setGcsUtil(mockGcsUtil); + validator = GcsPathValidator.fromOptions(options); + } + + @Test + public void testValidFilePattern() { + validator.validateInputFilePatternSupported("gs://bucket/path"); + } + + @Test + public void testInvalidFilePattern() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Expected a valid 'gs://' path but was given '/local/path'"); + validator.validateInputFilePatternSupported("/local/path"); + } + + @Test + public void testWhenBucketDoesNotExist() throws Exception { + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Could not find file gs://non-existent-bucket/location"); + validator.validateInputFilePatternSupported("gs://non-existent-bucket/location"); + } + + @Test + public void testValidOutputPrefix() { + validator.validateOutputFilePrefixSupported("gs://bucket/path"); + } + + @Test + public void testInvalidOutputPrefix() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage( + "Expected a valid 'gs://' path but was given '/local/path'"); + validator.validateOutputFilePrefixSupported("/local/path"); + } +}
