Repository: incubator-beam Updated Branches: refs/heads/master c4089ee77 -> 479c19a0b
[BEAM-895] Allow empty GCP credential for pipelines that access to public data. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b383b947 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b383b947 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b383b947 Branch: refs/heads/master Commit: b383b947cb5d75d735a2658f3348558d8e2c2d0a Parents: c4089ee Author: Pei He <[email protected]> Authored: Thu Nov 3 13:47:45 2016 -0700 Committer: Davor Bonaci <[email protected]> Committed: Thu Nov 17 10:51:35 2016 -0800 ---------------------------------------------------------------------- .../dataflow/util/DataflowTransport.java | 10 ++-- .../beam/sdk/util/GcpCredentialFactory.java | 13 ++-- .../beam/sdk/util/NoopCredentialFactory.java | 31 +++++++++- .../sdk/util/NullCredentialInitializer.java | 62 ++++++++++++++++++++ .../org/apache/beam/sdk/util/Transport.java | 9 ++- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 ++ 6 files changed, 115 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b383b947/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index e0026de..7f3b6c7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList; import java.net.MalformedURLException; import java.net.URL; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.util.NullCredentialInitializer; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; /** @@ -94,11 +95,10 @@ public class DataflowTransport { private static HttpRequestInitializer chainHttpRequestInitializer( Credentials credential, HttpRequestInitializer httpRequestInitializer) { if (credential == null) { - return httpRequestInitializer; - } else { - return new ChainingHttpRequestInitializer( - new HttpCredentialsAdapter(credential), - httpRequestInitializer); + NullCredentialInitializer.throwNullCredentialException(); } + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b383b947/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java index feb93f7..e1fa18f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java @@ -51,16 +51,17 @@ public class GcpCredentialFactory implements CredentialFactory { return INSTANCE; } + /** + * Returns a default GCP {@link Credentials} or null when it fails. + */ @Override - public Credentials getCredential() throws IOException { + public Credentials getCredential() { try { return GoogleCredentials.getApplicationDefault().createScoped(SCOPES); } catch (IOException e) { - throw new RuntimeException("Unable to get application default credentials. Please see " - + "https://developers.google.com/accounts/docs/application-default-credentials " - + "for details on how to specify credentials. This version of the SDK is " - + "dependent on the gcloud core component version 2015.02.05 or newer to " - + "be able to get credentials from the currently authorized user via gcloud auth.", e); + // Ignore the exception + // Pipelines that only access to public data should be able to run without credentials. + return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b383b947/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java index 29c3e72..f703e4c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java @@ -19,6 +19,9 @@ package org.apache.beam.sdk.util; import com.google.auth.Credentials; import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; import org.apache.beam.sdk.options.PipelineOptions; /** @@ -27,6 +30,7 @@ import org.apache.beam.sdk.options.PipelineOptions; */ public class NoopCredentialFactory implements CredentialFactory { private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory(); + private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials(); public static NoopCredentialFactory fromOptions(PipelineOptions options) { return INSTANCE; @@ -34,6 +38,31 @@ public class NoopCredentialFactory implements CredentialFactory { @Override public Credentials getCredential() throws IOException { - return null; + return NOOP_CREDENTIALS; + } + + private static class NoopCredentials extends Credentials { + @Override + public String getAuthenticationType() { + return null; + } + + @Override + public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { + return null; + } + + @Override + public boolean hasRequestMetadata() { + return false; + } + + @Override + public boolean hasRequestMetadataOnly() { + return false; + } + + @Override + public void refresh() throws IOException {} } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b383b947/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java new file mode 100644 index 0000000..4ed35c6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java @@ -0,0 +1,62 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.beam.sdk.util; + +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpUnsuccessfulResponseHandler; +import java.io.IOException; + +/** + * A {@link HttpRequestInitializer} for requests that don't have credentials. + * + * <p>When the access is denied, it throws {@link IOException} with a detailed error message. + */ +public class NullCredentialInitializer implements HttpRequestInitializer { + private static final int ACCESS_DENIED = 401; + private static final String NULL_CREDENTIAL_REASON = + "Unable to get application default credentials. Please see " + + "https://developers.google.com/accounts/docs/application-default-credentials " + + "for details on how to specify credentials. This version of the SDK is " + + "dependent on the gcloud core component version 2015.02.05 or newer to " + + "be able to get credentials from the currently authorized user via gcloud auth."; + + @Override + public void initialize(HttpRequest httpRequest) throws IOException { + httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler()); + } + + private static class NullCredentialHttpUnsuccessfulResponseHandler + implements HttpUnsuccessfulResponseHandler { + + @Override + public boolean handleResponse( + HttpRequest httpRequest, + HttpResponse httpResponse, boolean supportsRetry) throws IOException { + if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) { + throwNullCredentialException(); + } + return supportsRetry; + } + } + + public static void throwNullCredentialException() { + throw new RuntimeException(NULL_CREDENTIAL_REASON); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b383b947/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java index 38eecc2..1edfa1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -129,9 +129,13 @@ public class Transport { */ public static CloudResourceManager.Builder newCloudResourceManagerClient(CloudResourceManagerOptions options) { + Credentials credentials = options.getGcpCredential(); + if (credentials == null) { + NullCredentialInitializer.throwNullCredentialException(); + } return new CloudResourceManager.Builder(getTransport(), getJsonFactory(), chainHttpRequestInitializer( - options.getGcpCredential(), + credentials, // Do not log 404. It clutters the output and is possibly even required by the caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) .setApplicationName(options.getAppName()) @@ -164,7 +168,8 @@ public class Transport { private static HttpRequestInitializer chainHttpRequestInitializer( Credentials credential, HttpRequestInitializer httpRequestInitializer) { if (credential == null) { - return httpRequestInitializer; + return new ChainingHttpRequestInitializer( + new NullCredentialInitializer(), httpRequestInitializer); } else { return new ChainingHttpRequestInitializer( new HttpCredentialsAdapter(credential), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b383b947/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7c9b3e0..0e01246 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2333,6 +2333,8 @@ public class BigQueryIO { throw new IllegalArgumentException( String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)), e); + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; } else { throw new RuntimeException( String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", @@ -2350,6 +2352,8 @@ public class BigQueryIO { if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { throw new IllegalArgumentException( String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e); + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; } else { throw new RuntimeException( String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table",
