[BEAM-1871] Create new GCP core module package and move several GCP related classes from beam-sdks-java-core over.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be92f595 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be92f595 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be92f595 Branch: refs/heads/master Commit: be92f5952706c2cbb980df5073fbc74925ed68e6 Parents: ed52d32 Author: Luke Cwik <[email protected]> Authored: Mon Apr 17 18:02:02 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue Apr 18 16:18:53 2017 -0700 ---------------------------------------------------------------------- examples/java/pom.xml | 5 + examples/java8/pom.xml | 5 + pom.xml | 6 + runners/flink/examples/pom.xml | 4 + runners/google-cloud-dataflow-java/pom.xml | 5 + sdks/java/core/pom.xml | 46 +- .../beam/sdk/options/BigQueryOptions.java | 32 - .../options/CloudResourceManagerOptions.java | 40 - .../DefaultPipelineOptionsRegistrar.java | 5 - .../org/apache/beam/sdk/options/GcpOptions.java | 227 ------ .../org/apache/beam/sdk/options/GcsOptions.java | 158 ---- .../beam/sdk/options/GoogleApiDebugOptions.java | 87 -- .../beam/sdk/options/PipelineOptions.java | 4 +- .../apache/beam/sdk/options/PubsubOptions.java | 36 - .../apache/beam/sdk/runners/PipelineRunner.java | 2 + .../beam/sdk/testing/BigqueryMatcher.java | 256 ------ .../apache/beam/sdk/testing/TestPipeline.java | 3 - .../beam/sdk/util/AppEngineEnvironment.java | 62 -- .../apache/beam/sdk/util/CredentialFactory.java | 29 - .../org/apache/beam/sdk/util/DefaultBucket.java | 105 --- .../beam/sdk/util/GcpCredentialFactory.java | 67 -- .../apache/beam/sdk/util/GcpProjectUtil.java | 106 --- .../beam/sdk/util/GcsIOChannelFactory.java | 111 --- .../sdk/util/GcsIOChannelFactoryRegistrar.java | 38 - .../apache/beam/sdk/util/GcsPathValidator.java | 95 --- .../java/org/apache/beam/sdk/util/GcsUtil.java | 798 ------------------- .../util/IntervalBoundedExponentialBackOff.java | 89 --- .../beam/sdk/util/NoopCredentialFactory.java | 68 -- .../sdk/util/NullCredentialInitializer.java | 62 -- .../org/apache/beam/sdk/util/Transport.java | 178 ----- .../org/apache/beam/SdkCoreApiSurfaceTest.java | 2 - .../java/org/apache/beam/sdk/io/TextIOTest.java | 97 +-- .../apache/beam/sdk/options/GcpOptionsTest.java | 171 ---- .../sdk/options/GoogleApiDebugOptionsTest.java | 145 ---- .../sdk/options/PipelineOptionsFactoryTest.java | 4 +- .../beam/sdk/runners/PipelineRunnerTest.java | 46 +- .../beam/sdk/testing/BigqueryMatcherTest.java | 176 ---- .../beam/sdk/testing/TestPipelineTest.java | 6 +- .../apache/beam/sdk/util/DefaultBucketTest.java | 112 --- .../beam/sdk/util/GcpProjectUtilTest.java | 76 -- .../util/GcsIOChannelFactoryRegistrarTest.java | 44 - .../beam/sdk/util/GcsIOChannelFactoryTest.java | 43 - .../beam/sdk/util/GcsPathValidatorTest.java | 87 -- .../org/apache/beam/sdk/util/GcsUtilTest.java | 798 ------------------- .../IntervalBoundedExponentialBackOffTest.java | 100 --- .../util/RetryHttpRequestInitializerTest.java | 290 ------- sdks/java/extensions/gcp-core/pom.xml | 217 +++++ .../beam/sdk/options/BigQueryOptions.java | 32 + .../options/CloudResourceManagerOptions.java | 40 + .../org/apache/beam/sdk/options/GcpOptions.java | 227 ++++++ .../options/GcpPipelineOptionsRegistrar.java | 39 + .../org/apache/beam/sdk/options/GcsOptions.java | 158 ++++ .../beam/sdk/options/GoogleApiDebugOptions.java | 87 ++ .../apache/beam/sdk/options/PubsubOptions.java | 36 + .../apache/beam/sdk/options/package-info.java | 22 + .../beam/sdk/testing/BigqueryMatcher.java | 256 ++++++ .../apache/beam/sdk/testing/package-info.java | 21 + .../beam/sdk/util/AppEngineEnvironment.java | 62 ++ .../apache/beam/sdk/util/CredentialFactory.java | 29 + .../org/apache/beam/sdk/util/DefaultBucket.java | 105 +++ .../beam/sdk/util/GcpCredentialFactory.java | 67 ++ .../apache/beam/sdk/util/GcpProjectUtil.java | 106 +++ .../beam/sdk/util/GcsIOChannelFactory.java | 111 +++ .../sdk/util/GcsIOChannelFactoryRegistrar.java | 38 + .../apache/beam/sdk/util/GcsPathValidator.java | 95 +++ .../java/org/apache/beam/sdk/util/GcsUtil.java | 798 +++++++++++++++++++ .../util/IntervalBoundedExponentialBackOff.java | 89 +++ .../beam/sdk/util/NoopCredentialFactory.java | 68 ++ .../sdk/util/NullCredentialInitializer.java | 62 ++ .../org/apache/beam/sdk/util/Transport.java | 178 +++++ .../org/apache/beam/sdk/util/package-info.java | 20 + .../org/apache/beam/GcpCoreApiSurfaceTest.java | 62 ++ .../apache/beam/sdk/options/GcpOptionsTest.java | 171 ++++ .../sdk/options/GoogleApiDebugOptionsTest.java | 145 ++++ .../beam/sdk/testing/BigqueryMatcherTest.java | 176 ++++ .../apache/beam/sdk/util/DefaultBucketTest.java | 112 +++ .../beam/sdk/util/GcpProjectUtilTest.java | 76 ++ .../util/GcsIOChannelFactoryRegistrarTest.java | 44 + .../beam/sdk/util/GcsIOChannelFactoryTest.java | 43 + .../beam/sdk/util/GcsPathValidatorTest.java | 87 ++ .../org/apache/beam/sdk/util/GcsUtilTest.java | 798 +++++++++++++++++++ .../IntervalBoundedExponentialBackOffTest.java | 100 +++ .../util/RetryHttpRequestInitializerTest.java | 290 +++++++ sdks/java/extensions/pom.xml | 1 + sdks/java/harness/pom.xml | 5 + sdks/java/io/google-cloud-platform/pom.xml | 20 +- 86 files changed, 5130 insertions(+), 4889 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 021a819..ae3d63d 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -461,6 +461,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/examples/java8/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index 912c341..cd69acb 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -208,6 +208,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 09f3985..306978d 100644 --- a/pom.xml +++ b/pom.xml @@ -357,6 +357,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-sorter</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/runners/flink/examples/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml index fa642bd..aaf76d9 100644 --- a/runners/flink/examples/pom.xml +++ b/runners/flink/examples/pom.xml @@ -65,6 +65,10 @@ </profiles> <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + </dependency> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index ff63a31..68d433a 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -171,6 +171,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-common-runner-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 0ac40f4..4ba8e3b 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -145,38 +145,30 @@ </dependency> <dependency> - <groupId>com.google.auth</groupId> - <artifactId>google-auth-library-oauth2-http</artifactId> - </dependency> - - <dependency> - <groupId>com.google.api-client</groupId> - <artifactId>google-api-client</artifactId> - </dependency> - - <dependency> <groupId>com.google.apis</groupId> <artifactId>google-api-services-bigquery</artifactId> </dependency> <dependency> <groupId>com.google.apis</groupId> - <artifactId>google-api-services-cloudresourcemanager</artifactId> + <artifactId>google-api-services-storage</artifactId> </dependency> <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-pubsub</artifactId> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> </dependency> <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-storage</artifactId> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>gcsio</artifactId> + <scope>runtime</scope> </dependency> <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client</artifactId> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>util</artifactId> + <scope>runtime</scope> </dependency> <!-- Required by com.google.apis:google-api-services-datastore-protobuf, but @@ -189,31 +181,11 @@ <dependency> <groupId>com.google.http-client</groupId> - <artifactId>google-http-client-jackson2</artifactId> - </dependency> - - <dependency> - <groupId>com.google.http-client</groupId> <artifactId>google-http-client-protobuf</artifactId> <scope>runtime</scope> </dependency> <dependency> - <groupId>com.google.oauth-client</groupId> - <artifactId>google-oauth-client</artifactId> - </dependency> - - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>gcsio</artifactId> - </dependency> - - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>util</artifactId> - </dependency> - - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java deleted file mode 100644 index 7672cd7..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -/** - * Properties needed when using Google BigQuery with the Apache Beam SDK. - */ -@Description("Options that are used to configure Google BigQuery. See " - + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.") -public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions, - PipelineOptions, StreamingOptions { - @Description("Temporary dataset for BigQuery table operations. " - + "Supported values are \"bigquery.googleapis.com/{dataset}\"") - @Default.String("bigquery.googleapis.com/cloud_dataflow") - String getTempDatasetId(); - void setTempDatasetId(String value); -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java deleted file mode 100644 index 13fdaf3..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import org.apache.beam.sdk.util.GcpProjectUtil; - -/** - * Properties needed when using Google CloudResourceManager with the Apache Beam SDK. - */ -@Description("Options that are used to configure Google CloudResourceManager. See " - + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.") -public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions, - PipelineOptions, StreamingOptions { - /** - * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage. - */ - @JsonIgnore - @Description("The GcpProjectUtil instance that should be used to communicate" - + " with Google Cloud Resource Manager.") - @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class) - @Hidden - GcpProjectUtil getGcpProjectUtil(); - void setGcpProjectUtil(GcpProjectUtil value); -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java index 069c109..b0ce812 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DefaultPipelineOptionsRegistrar.java @@ -32,11 +32,6 @@ public class DefaultPipelineOptionsRegistrar implements PipelineOptionsRegistrar .add(PipelineOptions.class) .add(ApplicationNameOptions.class) .add(StreamingOptions.class) - .add(BigQueryOptions.class) - .add(GcpOptions.class) - .add(GcsOptions.class) - .add(GoogleApiDebugOptions.class) - .add(PubsubOptions.class) .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java deleted file mode 100644 index d01406f..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import static com.google.common.base.Strings.isNullOrEmpty; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.auth.Credentials; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; -import java.util.Locale; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nullable; -import org.apache.beam.sdk.util.CredentialFactory; -import org.apache.beam.sdk.util.DefaultBucket; -import org.apache.beam.sdk.util.GcpCredentialFactory; -import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.PathValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Options used to configure Google Cloud Platform specific options such as the project - * and credentials. - * - * <p>These options defer to the - * <a href="https://developers.google.com/accounts/docs/application-default-credentials"> - * application default credentials</a> for authentication. See the - * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for - * alternative mechanisms for creating credentials. - */ -@Description("Options used to configure Google Cloud Platform project and credentials.") -public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { - /** - * Project id to use when launching jobs. - */ - @Description("Project id. Required when using Google Cloud Platform services. " - + "See https://cloud.google.com/storage/docs/projects for further details.") - @Default.InstanceFactory(DefaultProjectFactory.class) - String getProject(); - void setProject(String value); - - /** - * GCP <a href="https://developers.google.com/compute/docs/zones" - * >availability zone</a> for operations. - * - * <p>Default is set on a per-service basis. - */ - @Description("GCP availability zone for running GCP operations. " - + "Default is up to the individual service.") - String getZone(); - void setZone(String value); - - /** - * The class of the credential factory that should be created and used to create - * credentials. If gcpCredential has not been set explicitly, an instance of this class will - * be constructed and used as a credential factory. - */ - @Description("The class of the credential factory that should be created and used to create " - + "credentials. If gcpCredential has not been set explicitly, an instance of this class will " - + "be constructed and used as a credential factory.") - @Default.Class(GcpCredentialFactory.class) - Class<? extends CredentialFactory> getCredentialFactoryClass(); - void setCredentialFactoryClass( - Class<? extends CredentialFactory> credentialFactoryClass); - - /** - * The credential instance that should be used to authenticate against GCP services. - * If no credential has been set explicitly, the default is to use the instance factory - * that constructs a credential based upon the currently set credentialFactoryClass. - */ - @JsonIgnore - @Description("The credential instance that should be used to authenticate against GCP services. " - + "If no credential has been set explicitly, the default is to use the instance factory " - + "that constructs a credential based upon the currently set credentialFactoryClass.") - @Default.InstanceFactory(GcpUserCredentialsFactory.class) - Credentials getGcpCredential(); - void setGcpCredential(Credentials value); - - /** - * Attempts to infer the default project based upon the environment this application - * is executing within. Currently this only supports getting the default project from gcloud. - */ - class DefaultProjectFactory implements DefaultValueFactory<String> { - private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class); - - @Override - public String create(PipelineOptions options) { - try { - File configFile; - if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) { - configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties"); - } else if (isWindows() && getEnvironment().containsKey("APPDATA")) { - configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties"); - } else { - // New versions of gcloud use this file - configFile = new File( - System.getProperty("user.home"), - ".config/gcloud/configurations/config_default"); - if (!configFile.exists()) { - // Old versions of gcloud use this file - configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties"); - } - } - String section = null; - Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$"); - Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$"); - for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) { - line = line.trim(); - if (line.isEmpty() || line.startsWith(";")) { - continue; - } - Matcher matcher = sectionPattern.matcher(line); - if (matcher.matches()) { - section = matcher.group(1); - } else if (section == null || section.equals("core")) { - matcher = projectPattern.matcher(line); - if (matcher.matches()) { - String project = matcher.group(1).trim(); - LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect " - + "project, please cancel this Pipeline and specify the command-line " - + "argument --project.", project); - return project; - } - } - } - } catch (IOException expected) { - LOG.debug("Failed to find default project.", expected); - } - // return null if can't determine - return null; - } - - /** - * Returns true if running on the Windows OS. - */ - private static boolean isWindows() { - return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows"); - } - - /** - * Used to mock out getting environment variables. - */ - @VisibleForTesting - Map<String, String> getEnvironment() { - return System.getenv(); - } - } - - /** - * Attempts to load the GCP credentials. See - * {@link CredentialFactory#getCredential()} for more details. - */ - class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> { - @Override - public Credentials create(PipelineOptions options) { - GcpOptions gcpOptions = options.as(GcpOptions.class); - try { - CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class) - .fromClass(gcpOptions.getCredentialFactoryClass()) - .fromFactoryMethod("fromOptions") - .withArg(PipelineOptions.class, options) - .build(); - return factory.getCredential(); - } catch (IOException | GeneralSecurityException e) { - throw new RuntimeException("Unable to obtain credential", e); - } - } - } - - /** - * A GCS path for storing temporary files in GCP. - * - * <p>Its default to {@link PipelineOptions#getTempLocation}. - */ - @Description("A GCS path for storing temporary files in GCP.") - @Default.InstanceFactory(GcpTempLocationFactory.class) - @Nullable String getGcpTempLocation(); - void setGcpTempLocation(String value); - - /** - * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location. - */ - class GcpTempLocationFactory implements DefaultValueFactory<String> { - - @Override - @Nullable - public String create(PipelineOptions options) { - String tempLocation = options.getTempLocation(); - if (isNullOrEmpty(tempLocation)) { - tempLocation = DefaultBucket.tryCreateDefaultBucket(options); - options.setTempLocation(tempLocation); - } else { - try { - PathValidator validator = options.as(GcsOptions.class).getPathValidator(); - validator.validateOutputFilePrefixSupported(tempLocation); - } catch (Exception e) { - throw new IllegalArgumentException(String.format( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path, %s. ", tempLocation), e); - } - } - return tempLocation; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java deleted file mode 100644 index 2187e7d..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.apache.beam.sdk.util.AppEngineEnvironment; -import org.apache.beam.sdk.util.GcsPathValidator; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.PathValidator; - -/** - * Options used to configure Google Cloud Storage. - */ -public interface GcsOptions extends - ApplicationNameOptions, GcpOptions, PipelineOptions { - /** - * The GcsUtil instance that should be used to communicate with Google Cloud Storage. - */ - @JsonIgnore - @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.") - @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class) - @Hidden - GcsUtil getGcsUtil(); - void setGcsUtil(GcsUtil value); - - /** - * The ExecutorService instance to use to create threads, can be overridden to specify an - * ExecutorService that is compatible with the users environment. If unset, the - * default is to create an ExecutorService with an unbounded number of threads; this - * is compatible with Google AppEngine. - */ - @JsonIgnore - @Description("The ExecutorService instance to use to create multiple threads. Can be overridden " - + "to specify an ExecutorService that is compatible with the users environment. If unset, " - + "the default is to create an ExecutorService with an unbounded number of threads; this " - + "is compatible with Google AppEngine.") - @Default.InstanceFactory(ExecutorServiceFactory.class) - @Hidden - ExecutorService getExecutorService(); - void setExecutorService(ExecutorService value); - - /** - * GCS endpoint to use. If unspecified, uses the default endpoint. - */ - @JsonIgnore - @Hidden - @Description("The URL for the GCS API.") - String getGcsEndpoint(); - void setGcsEndpoint(String value); - - /** - * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for - * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the - * restrictions and performance implications of this value. - */ - @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the " - + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more " - + "information on the restrictions and performance implications of this value.\n\n" - + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" - + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") - @Nullable - Integer getGcsUploadBufferSizeBytes(); - void setGcsUploadBufferSizeBytes(@Nullable Integer bytes); - - /** - * The class of the validator that should be created and used to validate paths. - * If pathValidator has not been set explicitly, an instance of this class will be - * constructed and used as the path validator. - */ - @Description("The class of the validator that should be created and used to validate paths. " - + "If pathValidator has not been set explicitly, an instance of this class will be " - + "constructed and used as the path validator.") - @Default.Class(GcsPathValidator.class) - Class<? extends PathValidator> getPathValidatorClass(); - void setPathValidatorClass(Class<? extends PathValidator> validatorClass); - - /** - * The path validator instance that should be used to validate paths. - * If no path validator has been set explicitly, the default is to use the instance factory that - * constructs a path validator based upon the currently set pathValidatorClass. - */ - @JsonIgnore - @Description("The path validator instance that should be used to validate paths. " - + "If no path validator has been set explicitly, the default is to use the instance factory " - + "that constructs a path validator based upon the currently set pathValidatorClass.") - @Default.InstanceFactory(PathValidatorFactory.class) - PathValidator getPathValidator(); - void setPathValidator(PathValidator validator); - - /** - * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The - * {@link ExecutorService} is compatible with AppEngine. - */ - class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> { - @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. - @Override - public ExecutorService create(PipelineOptions options) { - ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); - threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); - if (!AppEngineEnvironment.IS_APP_ENGINE) { - // AppEngine doesn't allow modification of threads to be daemon threads. - threadFactoryBuilder.setDaemon(true); - } - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - return new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. - Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. - new SynchronousQueue<Runnable>(), - threadFactoryBuilder.build()); - } - } - - /** - * Creates a {@link PathValidator} object using the class specified in - * {@link #getPathValidatorClass()}. - */ - class PathValidatorFactory implements DefaultValueFactory<PathValidator> { - @Override - public PathValidator create(PipelineOptions options) { - GcsOptions gcsOptions = options.as(GcsOptions.class); - return InstanceBuilder.ofType(PathValidator.class) - .fromClass(gcsOptions.getPathValidatorClass()) - .fromFactoryMethod("fromOptions") - .withArg(PipelineOptions.class, options) - .build(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java deleted file mode 100644 index f9cb575..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GoogleApiDebugOptions.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import com.google.api.client.googleapis.services.AbstractGoogleClient; -import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; -import com.google.api.client.googleapis.services.GoogleClientRequestInitializer; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * These options configure debug settings for Google API clients created within the Apache Beam SDK. - */ -public interface GoogleApiDebugOptions extends PipelineOptions { - /** - * This option enables tracing of API calls to Google services used within the Apache - * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...} - * </code> where the {@code ApiName} represents the request classes canonical name. The - * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported. - * Typically, "producer" is the right destination to use: this makes API traces available to the - * team offering the API. Note that by enabling this option, the contents of the requests to and - * from Google Cloud services will be made available to Google. For example, by specifying - * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available - * to Google, specifically to the Google Cloud Dataflow team. - */ - @Description("This option enables tracing of API calls to Google services used within the Apache " - + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} " - + "where the ApiName represents the request classes canonical name. The TraceDestination is " - + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is " - + "the right destination to use: this makes API traces available to the team offering the " - + "API. Note that by enabling this option, the contents of the requests to and from " - + "Google Cloud services will be made available to Google. For example, by specifying " - + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to " - + "Google, specifically to the Google Cloud Dataflow team.") - GoogleApiTracer getGoogleApiTrace(); - void setGoogleApiTrace(GoogleApiTracer commands); - - /** - * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls. - */ - class GoogleApiTracer extends HashMap<String, String> - implements GoogleClientRequestInitializer { - /** - * Creates a {@link GoogleApiTracer} that sets the trace destination on all - * calls that match the given client type. - */ - public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) { - put(client.getClass().getCanonicalName(), traceDestination); - return this; - } - - /** - * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all - * calls that match for the given request type. - */ - public GoogleApiTracer addTraceFor( - AbstractGoogleClientRequest<?> request, String traceDestination) { - put(request.getClass().getCanonicalName(), traceDestination); - return this; - } - - @Override - public void initialize(AbstractGoogleClientRequest<?> request) throws IOException { - for (Map.Entry<String, String> entry : this.entrySet()) { - if (request.getClass().getCanonicalName().contains(entry.getKey())) { - request.set("$trace", entry.getValue()); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 4e7bc89..88d6576 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -30,7 +30,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; import org.apache.beam.sdk.runners.PipelineRunner; @@ -187,8 +186,7 @@ import org.joda.time.format.DateTimeFormatter; * <a href="https://github.com/FasterXML/jackson-annotations">annotations</a> to aid in * serialization of custom types. We point you to the public * <a href="https://github.com/FasterXML/jackson">Jackson documentation</a> when attempting - * to add serialization support for your custom types. See {@link GoogleApiTracer} for an - * example using the Jackson annotations to serialize and deserialize a custom type. + * to add serialization support for your custom types. * * <p>Note: It is an error to have the same property available in multiple interfaces with only * some of them being annotated with {@link JsonIgnore @JsonIgnore}. It is also an error to mark a http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java deleted file mode 100644 index b065d19..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PubsubOptions.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -/** - * Properties that can be set when using Google Cloud Pub/Sub with the Apache Beam SDK. - */ -@Description("Options that are used to configure Google Cloud Pub/Sub. See " - + "https://cloud.google.com/pubsub/docs/overview for details on Cloud Pub/Sub.") -public interface PubsubOptions extends ApplicationNameOptions, GcpOptions, - PipelineOptions, StreamingOptions { - - /** - * Root URL for use with the Google Cloud Pub/Sub API. - */ - @Description("Root URL for use with the Google Cloud Pub/Sub API") - @Default.String("https://pubsub.googleapis.com") - @Hidden - String getPubsubRootUrl(); - void setPubsubRootUrl(String value); -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java index 7b2fba3..a318dfc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.InstanceBuilder; @@ -41,6 +42,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> { */ public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) { checkNotNull(options); + PipelineOptionsValidator.validate(PipelineOptions.class, options); // (Re-)register standard IO factories. Clobbers any prior credentials. IOChannelUtils.registerIOFactoriesAllowOverride(options); http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java deleted file mode 100644 index 8f752c0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.testing; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.BigqueryScopes; -import com.google.api.services.bigquery.model.QueryRequest; -import com.google.api.services.bigquery.model.QueryResponse; -import com.google.api.services.bigquery.model.TableCell; -import com.google.api.services.bigquery.model.TableRow; -import com.google.auth.Credentials; -import com.google.auth.http.HttpCredentialsAdapter; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import javax.annotation.Nonnull; -import javax.annotation.concurrent.NotThreadSafe; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Transport; -import org.hamcrest.Description; -import org.hamcrest.TypeSafeMatcher; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A matcher to verify data in BigQuery by processing given query - * and comparing with content's checksum. - * - * <p>Example: - * <pre>{@code [ - * assertThat(job, new BigqueryMatcher(appName, projectId, queryString, expectedChecksum)); - * ]}</pre> - */ -@NotThreadSafe -public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult> - implements SerializableMatcher<PipelineResult> { - private static final Logger LOG = LoggerFactory.getLogger(BigqueryMatcher.class); - - // The maximum number of retries to execute a BigQuery RPC - static final int MAX_QUERY_RETRIES = 4; - - // The initial backoff for executing a BigQuery RPC - private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L); - - // The total number of rows in query response to be formatted for debugging purpose - private static final int TOTAL_FORMATTED_ROWS = 20; - - // The backoff factory with initial configs - static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_QUERY_RETRIES) - .withInitialBackoff(INITIAL_BACKOFF); - - private final String applicationName; - private final String projectId; - private final String query; - private final String expectedChecksum; - private String actualChecksum; - private transient QueryResponse response; - - public BigqueryMatcher( - String applicationName, String projectId, String query, String expectedChecksum) { - validateArgument("applicationName", applicationName); - validateArgument("projectId", projectId); - validateArgument("query", query); - validateArgument("expectedChecksum", expectedChecksum); - - this.applicationName = applicationName; - this.projectId = projectId; - this.query = query; - this.expectedChecksum = expectedChecksum; - } - - @Override - protected boolean matchesSafely(PipelineResult pipelineResult) { - LOG.info("Verifying Bigquery data"); - Bigquery bigqueryClient = newBigqueryClient(applicationName); - - // execute query - LOG.debug("Executing query: {}", query); - try { - QueryRequest queryContent = new QueryRequest(); - queryContent.setQuery(query); - - response = queryWithRetries( - bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff()); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedIOException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException("Failed to fetch BigQuery data.", e); - } - - if (!response.getJobComplete()) { - // query job not complete, verification failed - return false; - } else { - // compute checksum - actualChecksum = generateHash(response.getRows()); - LOG.debug("Generated a SHA1 checksum based on queried data: {}", actualChecksum); - - return expectedChecksum.equals(actualChecksum); - } - } - - @VisibleForTesting - Bigquery newBigqueryClient(String applicationName) { - HttpTransport transport = Transport.getTransport(); - JsonFactory jsonFactory = Transport.getJsonFactory(); - Credentials credential = getDefaultCredential(); - - return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential)) - .setApplicationName(applicationName) - .build(); - } - - @Nonnull - @VisibleForTesting - QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest queryContent, - Sleeper sleeper, BackOff backOff) - throws IOException, InterruptedException { - IOException lastException = null; - do { - if (lastException != null) { - LOG.warn("Retrying query ({}) after exception", queryContent.getQuery(), lastException); - } - try { - QueryResponse response = bigqueryClient.jobs().query(projectId, queryContent).execute(); - if (response != null) { - return response; - } else { - lastException = - new IOException("Expected valid response from query job, but received null."); - } - } catch (IOException e) { - // ignore and retry - lastException = e; - } - } while(BackOffUtils.next(sleeper, backOff)); - - throw new RuntimeException( - String.format( - "Unable to get BigQuery response after retrying %d times using query (%s)", - MAX_QUERY_RETRIES, - queryContent.getQuery()), - lastException); - } - - private void validateArgument(String name, String value) { - checkArgument( - !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value); - } - - private Credentials getDefaultCredential() { - GoogleCredentials credential; - try { - credential = GoogleCredentials.getApplicationDefault(); - } catch (IOException e) { - throw new RuntimeException("Failed to get application default credential.", e); - } - - if (credential.createScopedRequired()) { - Collection<String> bigqueryScope = - Lists.newArrayList(BigqueryScopes.CLOUD_PLATFORM_READ_ONLY); - credential = credential.createScoped(bigqueryScope); - } - return credential; - } - - private String generateHash(@Nonnull List<TableRow> rows) { - List<HashCode> rowHashes = Lists.newArrayList(); - for (TableRow row : rows) { - List<String> cellsInOneRow = Lists.newArrayList(); - for (TableCell cell : row.getF()) { - cellsInOneRow.add(Objects.toString(cell.getV())); - Collections.sort(cellsInOneRow); - } - rowHashes.add( - Hashing.sha1().hashString(cellsInOneRow.toString(), StandardCharsets.UTF_8)); - } - return Hashing.combineUnordered(rowHashes).toString(); - } - - @Override - public void describeTo(Description description) { - description - .appendText("Expected checksum is (") - .appendText(expectedChecksum) - .appendText(")"); - } - - @Override - public void describeMismatchSafely(PipelineResult pResult, Description description) { - String info; - if (!response.getJobComplete()) { - // query job not complete - info = String.format("The query job hasn't completed. Got response: %s", response); - } else { - // checksum mismatch - info = String.format("was (%s).%n" - + "\tTotal number of rows are: %d.%n" - + "\tQueried data details:%s", - actualChecksum, response.getTotalRows(), formatRows(TOTAL_FORMATTED_ROWS)); - } - description.appendText(info); - } - - private String formatRows(int totalNumRows) { - StringBuilder samples = new StringBuilder(); - List<TableRow> rows = response.getRows(); - for (int i = 0; i < totalNumRows && i < rows.size(); i++) { - samples.append(String.format("%n\t\t")); - for (TableCell field : rows.get(i).getF()) { - samples.append(String.format("%-10s", field.getV())); - } - } - if (rows.size() > totalNumRows) { - samples.append(String.format("%n\t\t...")); - } - return samples.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 3d3de51..1273442 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -42,14 +42,12 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.TestCredential; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.junit.runner.Description; @@ -400,7 +398,6 @@ public class TestPipeline extends Pipeline implements TestRule { if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) { options.setRunner(CrashingRunner.class); } - options.as(GcpOptions.class).setGcpCredential(new TestCredential()); } options.setStableUniqueNames(CheckEnabled.ERROR); http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java deleted file mode 100644 index b0fcbd1..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppEngineEnvironment.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.lang.reflect.InvocationTargetException; - -/** Stores whether we are running within AppEngine or not. */ -public class AppEngineEnvironment { - /** - * True if running inside of AppEngine, false otherwise. - */ - @Deprecated - public static final boolean IS_APP_ENGINE = isAppEngine(); - - /** - * Attempts to detect whether we are inside of AppEngine. - * - * <p>Purposely copied and left private from private <a href="https://code.google.com/p/ - * guava-libraries/source/browse/guava/src/com/google/common/util/concurrent/ - * MoreExecutors.java#785">code.google.common.util.concurrent.MoreExecutors#isAppEngine</a>. - * - * @return true if we are inside of AppEngine, false otherwise. - */ - static boolean isAppEngine() { - if (System.getProperty("com.google.appengine.runtime.environment") == null) { - return false; - } - try { - // If the current environment is null, we're not inside AppEngine. - return Class.forName("com.google.apphosting.api.ApiProxy") - .getMethod("getCurrentEnvironment") - .invoke(null) != null; - } catch (ClassNotFoundException e) { - // If ApiProxy doesn't exist, we're not on AppEngine at all. - return false; - } catch (InvocationTargetException e) { - // If ApiProxy throws an exception, we're not in a proper AppEngine environment. - return false; - } catch (IllegalAccessException e) { - // If the method isn't accessible, we're not on a supported version of AppEngine; - return false; - } catch (NoSuchMethodException e) { - // If the method doesn't exist, we're not on a supported version of AppEngine; - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java deleted file mode 100644 index 6229650..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.auth.Credentials; -import java.io.IOException; -import java.security.GeneralSecurityException; - -/** - * Construct an oauth credential to be used by the SDK and the SDK workers. - */ -public interface CredentialFactory { - Credentials getCredential() throws IOException, GeneralSecurityException; -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java deleted file mode 100644 index 75954c0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.isNullOrEmpty; - -import com.google.api.services.storage.model.Bucket; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; -import org.apache.beam.sdk.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility class for handling default GCS buckets. - */ -public class DefaultBucket { - static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class); - - static final String DEFAULT_REGION = "us-central1"; - - /** - * Creates a default bucket or verifies the existence and proper access control - * of an existing default bucket. Returns the location if successful. - */ - public static String tryCreateDefaultBucket(PipelineOptions options) { - GcsOptions gcpOptions = options.as(GcsOptions.class); - - final String projectId = gcpOptions.getProject(); - checkArgument(!isNullOrEmpty(projectId), - "--project is a required option."); - - // Look up the project number, to create a default bucket with a stable - // name with no special characters. - long projectNumber = 0L; - try { - projectNumber = gcpOptions.as(CloudResourceManagerOptions.class) - .getGcpProjectUtil().getProjectNumber(projectId); - } catch (IOException e) { - throw new RuntimeException("Unable to verify project with ID " + projectId, e); - } - String region = DEFAULT_REGION; - if (!isNullOrEmpty(gcpOptions.getZone())) { - region = getRegionFromZone(gcpOptions.getZone()); - } - final String bucketName = - "dataflow-staging-" + region + "-" + projectNumber; - LOG.info("No staging location provided, attempting to use default bucket: {}", - bucketName); - Bucket bucket = new Bucket() - .setName(bucketName) - .setLocation(region); - // Always try to create the bucket before checking access, so that we do not - // race with other pipelines that may be attempting to do the same thing. - try { - gcpOptions.getGcsUtil().createBucket(projectId, bucket); - } catch (FileAlreadyExistsException e) { - LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName); - } catch (IOException e) { - throw new RuntimeException("Unable create default bucket.", e); - } - - // Once the bucket is expected to exist, verify that it is correctly owned - // by the project executing the job. - try { - long owner = gcpOptions.getGcsUtil().bucketOwner( - GcsPath.fromComponents(bucketName, "")); - checkArgument( - owner == projectNumber, - "Bucket owner does not match the project from --project:" - + " %s vs. %s", owner, projectNumber); - } catch (IOException e) { - throw new RuntimeException( - "Unable to determine the owner of the default bucket at gs://" + bucketName, e); - } - return "gs://" + bucketName; - } - - @VisibleForTesting - static String getRegionFromZone(String zone) { - String[] zoneParts = zone.split("-"); - checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone); - return zoneParts[0] + "-" + zoneParts[1]; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java deleted file mode 100644 index e1fa18f..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.auth.Credentials; -import com.google.auth.oauth2.GoogleCredentials; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * Construct an oauth credential to be used by the SDK and the SDK workers. - * Returns a GCP credential. - */ -public class GcpCredentialFactory implements CredentialFactory { - /** - * The scope cloud-platform provides access to all Cloud Platform resources. - * cloud-platform isn't sufficient yet for talking to datastore so we request - * those resources separately. - * - * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for - * services we access directly (GCS) as opposed to through the backend - * (BigQuery, GCE), we need to explicitly request that scope. - */ - private static final List<String> SCOPES = Arrays.asList( - "https://www.googleapis.com/auth/cloud-platform", - "https://www.googleapis.com/auth/devstorage.full_control", - "https://www.googleapis.com/auth/userinfo.email", - "https://www.googleapis.com/auth/datastore", - "https://www.googleapis.com/auth/pubsub"); - - private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory(); - - public static GcpCredentialFactory fromOptions(PipelineOptions options) { - return INSTANCE; - } - - /** - * Returns a default GCP {@link Credentials} or null when it fails. - */ - @Override - public Credentials getCredential() { - try { - return GoogleCredentials.getApplicationDefault().createScoped(SCOPES); - } catch (IOException e) { - // Ignore the exception - // Pipelines that only access to public data should be able to run without credentials. - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java deleted file mode 100644 index f73afe0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; -import com.google.api.services.cloudresourcemanager.CloudResourceManager; -import com.google.api.services.cloudresourcemanager.model.Project; -import com.google.cloud.hadoop.util.ResilientOperation; -import com.google.cloud.hadoop.util.RetryDeterminer; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import org.apache.beam.sdk.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides operations on Google Cloud Platform Projects. - */ -public class GcpProjectUtil { - /** - * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using - * any transport flags specified on the {@link PipelineOptions}. - */ - public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> { - /** - * Returns an instance of {@link GcpProjectUtil} based on the - * {@link PipelineOptions}. - */ - @Override - public GcpProjectUtil create(PipelineOptions options) { - LOG.debug("Creating new GcpProjectUtil"); - CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class); - return new GcpProjectUtil( - Transport.newCloudResourceManagerClient(crmOptions).build()); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class); - - private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); - - /** Client for the CRM API. */ - private CloudResourceManager crmClient; - - private GcpProjectUtil(CloudResourceManager crmClient) { - this.crmClient = crmClient; - } - - // Use this only for testing purposes. - @VisibleForTesting - void setCrmClient(CloudResourceManager crmClient) { - this.crmClient = crmClient; - } - - /** - * Returns the project number or throws an exception if the project does not - * exist or has other access exceptions. - */ - public long getProjectNumber(String projectId) throws IOException { - return getProjectNumber( - projectId, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT); - } - - /** - * Returns the project number or throws an error if the project does not - * exist or has other access errors. - */ - @VisibleForTesting - long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException { - CloudResourceManager.Projects.Get getProject = - crmClient.projects().get(projectId); - try { - Project project = ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getProject), - backoff, - RetryDeterminer.SOCKET_ERRORS, - IOException.class, - sleeper); - return project.getProjectNumber(); - } catch (Exception e) { - throw new IOException("Unable to get project number", e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java deleted file mode 100644 index 745dcb9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.io.IOException; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.file.Path; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.gcsfs.GcsPath; - -/** - * Implements IOChannelFactory for GCS. - */ -public class GcsIOChannelFactory implements IOChannelFactory { - - /** - * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}. - */ - public static GcsIOChannelFactory fromOptions(PipelineOptions options) { - return new GcsIOChannelFactory(options.as(GcsOptions.class)); - } - - private final GcsOptions options; - - private GcsIOChannelFactory(GcsOptions options) { - this.options = options; - } - - @Override - public Collection<String> match(String spec) throws IOException { - GcsPath path = GcsPath.fromUri(spec); - GcsUtil util = options.getGcsUtil(); - List<GcsPath> matched = util.expand(path); - - List<String> specs = new LinkedList<>(); - for (GcsPath match : matched) { - specs.add(match.toString()); - } - - return specs; - } - - @Override - public ReadableByteChannel open(String spec) throws IOException { - GcsPath path = GcsPath.fromUri(spec); - GcsUtil util = options.getGcsUtil(); - return util.open(path); - } - - @Override - public WritableByteChannel create(String spec, String mimeType) - throws IOException { - GcsPath path = GcsPath.fromUri(spec); - GcsUtil util = options.getGcsUtil(); - return util.create(path, mimeType); - } - - @Override - public long getSizeBytes(String spec) throws IOException { - GcsPath path = GcsPath.fromUri(spec); - GcsUtil util = options.getGcsUtil(); - return util.fileSize(path); - } - - @Override - public boolean isReadSeekEfficient(String spec) throws IOException { - // TODO It is incorrect to return true here for files with content encoding set to gzip. - return true; - } - - @Override - public String resolve(String path, String other) throws IOException { - return toPath(path).resolve(other).toString(); - } - - @Override - public Path toPath(String path) { - return GcsPath.fromUri(path); - } - - @Override - public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) - throws IOException { - options.getGcsUtil().copy(srcFilenames, destFilenames); - } - - @Override - public void remove(Collection<String> filesOrDirs) throws IOException { - options.getGcsUtil().remove(filesOrDirs); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java deleted file mode 100644 index b4c457f..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.auto.service.AutoService; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * {@link AutoService} registrar for the {@link GcsIOChannelFactory}. - */ -@AutoService(IOChannelFactoryRegistrar.class) -public class GcsIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar { - - @Override - public GcsIOChannelFactory fromOptions(PipelineOptions options) { - return GcsIOChannelFactory.fromOptions(options); - } - - @Override - public String getScheme() { - return "gs"; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java deleted file mode 100644 index a5b951d..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.IOException; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.gcsfs.GcsPath; - -/** - * GCP implementation of {@link PathValidator}. Only GCS paths are allowed. - */ -public class GcsPathValidator implements PathValidator { - - private GcsOptions gcpOptions; - - private GcsPathValidator(GcsOptions options) { - this.gcpOptions = options; - } - - public static GcsPathValidator fromOptions(PipelineOptions options) { - return new GcsPathValidator(options.as(GcsOptions.class)); - } - - /** - * Validates the the input GCS path is accessible and that the path - * is well formed. - */ - @Override - public String validateInputFilePatternSupported(String filepattern) { - GcsPath gcsPath = getGcsPath(filepattern); - checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject())); - String returnValue = verifyPath(filepattern); - verifyPathIsAccessible(filepattern, "Could not find file %s"); - return returnValue; - } - - /** - * Validates the the output GCS path is accessible and that the path - * is well formed. - */ - @Override - public String validateOutputFilePrefixSupported(String filePrefix) { - String returnValue = verifyPath(filePrefix); - verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s"); - return returnValue; - } - - @Override - public String verifyPath(String path) { - GcsPath gcsPath = getGcsPath(path); - checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow"); - checkArgument(!gcsPath.getObject().contains("//"), - "Dataflow Service does not allow objects with consecutive slashes"); - return gcsPath.toResourceName(); - } - - private void verifyPathIsAccessible(String path, String errorMessage) { - GcsPath gcsPath = getGcsPath(path); - try { - checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), - errorMessage, path); - } catch (IOException e) { - throw new RuntimeException( - String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()), - e); - } - } - - private GcsPath getGcsPath(String path) { - try { - return GcsPath.fromUri(path); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String.format( - "Expected a valid 'gs://' path but was given '%s'", path), e); - } - } -}
