[BEAM-1871] Move several options/auth classes around in gcp-core Note that I kept a duplicate of GcsOptions because of its direct usage within the Dataflow Worker
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0aed801a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0aed801a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0aed801a Branch: refs/heads/master Commit: 0aed801acb1dd0709d4dc0dc9ad9b94ca1c882d2 Parents: 0c740f4 Author: Luke Cwik <[email protected]> Authored: Wed Apr 26 18:23:23 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Apr 27 10:50:45 2017 -0700 ---------------------------------------------------------------------- .../common/ExampleBigQueryTableOptions.java | 2 +- ...xamplePubsubTopicAndSubscriptionOptions.java | 2 +- .../common/ExamplePubsubTopicOptions.java | 2 +- .../beam/examples/common/ExampleUtils.java | 2 +- .../apache/beam/examples/complete/TfIdf.java | 2 +- .../complete/game/utils/WriteToBigQuery.java | 2 +- .../examples/MinimalWordCountJava8Test.java | 2 +- pom.xml | 7 + runners/google-cloud-dataflow-java/pom.xml | 7 + .../options/DataflowPipelineOptions.java | 4 +- .../dataflow/util/DataflowTransport.java | 2 +- .../BatchStatefulParDoOverridesTest.java | 2 +- .../runners/dataflow/DataflowMetricsTest.java | 2 +- .../dataflow/DataflowPipelineJobTest.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../runners/dataflow/DataflowRunnerTest.java | 4 +- .../testing/TestDataflowRunnerTest.java | 2 +- .../dataflow/util/MonitoringUtilTest.java | 2 +- .../runners/dataflow/util/PackageUtilTest.java | 2 +- .../extensions/gcp/auth/CredentialFactory.java | 29 +++ .../gcp/auth/GcpCredentialFactory.java | 67 ++++++ .../gcp/auth/NoopCredentialFactory.java | 68 ++++++ .../gcp/auth/NullCredentialInitializer.java | 62 +++++ .../sdk/extensions/gcp/auth/package-info.java | 22 ++ .../options/CloudResourceManagerOptions.java | 46 ++++ .../sdk/extensions/gcp/options/GcpOptions.java | 231 +++++++++++++++++++ .../options/GcpPipelineOptionsRegistrar.java | 39 ++++ .../sdk/extensions/gcp/options/GcsOptions.java | 160 +++++++++++++ .../gcp/options/GoogleApiDebugOptions.java | 89 +++++++ .../extensions/gcp/options/package-info.java | 22 ++ .../options/CloudResourceManagerOptions.java | 40 ---- .../org/apache/beam/sdk/options/GcpOptions.java | 227 ------------------ .../options/GcpPipelineOptionsRegistrar.java | 37 --- .../org/apache/beam/sdk/options/GcsOptions.java | 56 +---- .../beam/sdk/options/GoogleApiDebugOptions.java | 87 ------- .../apache/beam/sdk/util/CredentialFactory.java | 29 --- .../org/apache/beam/sdk/util/DefaultBucket.java | 4 +- .../beam/sdk/util/GcpCredentialFactory.java | 67 ------ .../apache/beam/sdk/util/GcpProjectUtil.java | 2 +- .../beam/sdk/util/GcsIOChannelFactory.java | 2 +- .../apache/beam/sdk/util/GcsPathValidator.java | 2 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 3 +- .../beam/sdk/util/NoopCredentialFactory.java | 68 ------ .../sdk/util/NullCredentialInitializer.java | 62 ----- .../apache/beam/sdk/util/TestCredential.java | 59 ----- .../org/apache/beam/sdk/util/Transport.java | 5 +- .../org/apache/beam/GcpCoreApiSurfaceTest.java | 60 ----- .../extensions/gcp/GcpCoreApiSurfaceTest.java | 60 +++++ .../sdk/extensions/gcp/auth/TestCredential.java | 59 +++++ .../extensions/gcp/options/GcpOptionsTest.java | 172 ++++++++++++++ .../gcp/options/GoogleApiDebugOptionsTest.java | 147 ++++++++++++ .../apache/beam/sdk/options/GcpOptionsTest.java | 171 -------------- .../sdk/options/GoogleApiDebugOptionsTest.java | 146 ------------ .../apache/beam/sdk/util/DefaultBucketTest.java | 6 +- .../beam/sdk/util/GcpProjectUtilTest.java | 3 +- .../beam/sdk/util/GcsIOChannelFactoryTest.java | 2 +- .../beam/sdk/util/GcsPathValidatorTest.java | 3 +- .../org/apache/beam/sdk/util/GcsUtilTest.java | 3 +- .../org/apache/beam/fn/harness/FnHarness.java | 2 +- .../fn/harness/logging/BeamFnLoggingClient.java | 2 +- .../harness/stream/StreamObserverFactory.java | 2 +- .../apache/beam/fn/harness/FnHarnessTest.java | 2 +- sdks/java/io/google-cloud-platform/pom.xml | 7 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 2 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 4 +- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 2 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 2 +- .../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 2 +- .../beam/sdk/io/gcp/pubsub/PubsubOptions.java | 2 +- .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 2 +- .../io/gcp/storage/GcsFileSystemRegistrar.java | 2 +- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 4 +- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 2 +- .../beam/sdk/io/gcp/datastore/V1ReadIT.java | 2 +- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 2 +- .../beam/sdk/io/gcp/datastore/V1WriteIT.java | 2 +- .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java | 2 +- .../sdk/io/gcp/storage/GcsFileSystemTest.java | 2 +- 79 files changed, 1360 insertions(+), 1160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java index 54502cb..c5216e6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleBigQueryTableOptions.java @@ -18,10 +18,10 @@ package org.apache.beam.examples.common; import com.google.api.services.bigquery.model.TableSchema; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; /** http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java index c64681c..6cffad2 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicAndSubscriptionOptions.java @@ -17,10 +17,10 @@ */ package org.apache.beam.examples.common; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; /** http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java index 65594d7..b594a66 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExamplePubsubTopicOptions.java @@ -17,10 +17,10 @@ */ package org.apache.beam.examples.common; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; /** http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java index 2650f8e..6ac37fd 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -47,11 +47,11 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.NullCredentialInitializer; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index f7904d3..2e1be90 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -28,10 +28,10 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringDelegateCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index 5eecddb..f767d21 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -27,10 +27,10 @@ import java.util.List; import java.util.Map; import org.apache.beam.examples.complete.game.UserScore; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index 6c66d8f..f3becf9 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -26,8 +26,8 @@ import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.List; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Filter; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 52d2b40..f96c14d 100644 --- a/pom.xml +++ b/pom.xml @@ -372,6 +372,13 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + <classifier>tests</classifier> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-sorter</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 75aac43..e95f4fc 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -377,6 +377,13 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> <groupId>com.google.cloud.dataflow</groupId> <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 1c3891e..0796b6d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -20,14 +20,14 @@ package org.apache.beam.runners.dataflow.options; import java.io.IOException; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index 7f3b6c7..b28b8d3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -30,7 +30,7 @@ import com.google.common.collect.ImmutableList; import java.net.MalformedURLException; import java.net.URL; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.util.NullCredentialInitializer; +import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; /** http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index f995ff3..ce7f678 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -38,13 +38,13 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index ddb719c..aabdd84 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -39,10 +39,10 @@ import java.io.IOException; import java.math.BigDecimal; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.TestCredential; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 9dd2ab1..e1235b9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -45,13 +45,13 @@ import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8185623..cf0cae4 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -80,7 +81,6 @@ import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.Structs; -import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.util.state.StateSpec; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 36704bc..433fb77 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -66,6 +66,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.TextIO.Read; import org.apache.beam.sdk.options.PipelineOptions; @@ -79,10 +81,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.NoopCredentialFactory; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.ReleaseInfo; -import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 307393c..80fbfe5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -53,6 +53,7 @@ import org.apache.beam.runners.dataflow.util.TimeUtil; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SerializableMatcher; @@ -60,7 +61,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.BaseMatcher; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java index 24b6c4e..c048776 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java @@ -31,9 +31,9 @@ import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler; import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.util.TestCredential; import org.joda.time.DateTime; import org.joda.time.Instant; import org.joda.time.chrono.ISOChronology; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index a11872f..877832c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -69,7 +69,7 @@ import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes; -import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java new file mode 100644 index 0000000..6ab7b14 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/CredentialFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.auth.Credentials; +import java.io.IOException; +import java.security.GeneralSecurityException; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + */ +public interface CredentialFactory { + Credentials getCredential() throws IOException, GeneralSecurityException; +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java new file mode 100644 index 0000000..f999c63 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/GcpCredentialFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + * Returns a GCP credential. + */ +public class GcpCredentialFactory implements CredentialFactory { + /** + * The scope cloud-platform provides access to all Cloud Platform resources. + * cloud-platform isn't sufficient yet for talking to datastore so we request + * those resources separately. + * + * <p>Note that trusted scope relationships don't apply to OAuth tokens, so for + * services we access directly (GCS) as opposed to through the backend + * (BigQuery, GCE), we need to explicitly request that scope. + */ + private static final List<String> SCOPES = Arrays.asList( + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/devstorage.full_control", + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/datastore", + "https://www.googleapis.com/auth/pubsub"); + + private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory(); + + public static GcpCredentialFactory fromOptions(PipelineOptions options) { + return INSTANCE; + } + + /** + * Returns a default GCP {@link Credentials} or null when it fails. + */ + @Override + public Credentials getCredential() { + try { + return GoogleCredentials.getApplicationDefault().createScoped(SCOPES); + } catch (IOException e) { + // Ignore the exception + // Pipelines that only access to public data should be able to run without credentials. + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java new file mode 100644 index 0000000..4355a10 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NoopCredentialFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.auth.Credentials; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Construct an oauth credential to be used by the SDK and the SDK workers. + * Always returns a null Credential object. + */ +public class NoopCredentialFactory implements CredentialFactory { + private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory(); + private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials(); + + public static NoopCredentialFactory fromOptions(PipelineOptions options) { + return INSTANCE; + } + + @Override + public Credentials getCredential() throws IOException { + return NOOP_CREDENTIALS; + } + + private static class NoopCredentials extends Credentials { + @Override + public String getAuthenticationType() { + return null; + } + + @Override + public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException { + return null; + } + + @Override + public boolean hasRequestMetadata() { + return false; + } + + @Override + public boolean hasRequestMetadataOnly() { + return false; + } + + @Override + public void refresh() throws IOException {} + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java new file mode 100644 index 0000000..00306f2 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/NullCredentialInitializer.java @@ -0,0 +1,62 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.beam.sdk.extensions.gcp.auth; + +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpUnsuccessfulResponseHandler; +import java.io.IOException; + +/** + * A {@link HttpRequestInitializer} for requests that don't have credentials. + * + * <p>When the access is denied, it throws {@link IOException} with a detailed error message. + */ +public class NullCredentialInitializer implements HttpRequestInitializer { + private static final int ACCESS_DENIED = 401; + private static final String NULL_CREDENTIAL_REASON = + "Unable to get application default credentials. Please see " + + "https://developers.google.com/accounts/docs/application-default-credentials " + + "for details on how to specify credentials. This version of the SDK is " + + "dependent on the gcloud core component version 2015.02.05 or newer to " + + "be able to get credentials from the currently authorized user via gcloud auth."; + + @Override + public void initialize(HttpRequest httpRequest) throws IOException { + httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler()); + } + + private static class NullCredentialHttpUnsuccessfulResponseHandler + implements HttpUnsuccessfulResponseHandler { + + @Override + public boolean handleResponse( + HttpRequest httpRequest, + HttpResponse httpResponse, boolean supportsRetry) throws IOException { + if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) { + throwNullCredentialException(); + } + return supportsRetry; + } + } + + public static void throwNullCredentialException() { + throw new RuntimeException(NULL_CREDENTIAL_REASON); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java new file mode 100644 index 0000000..3d77bf2 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/auth/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines classes related to interacting with {@link com.google.auth.Credentials} for + * pipeline creation and execution containing Google Cloud Platform components. + */ +package org.apache.beam.sdk.extensions.gcp.auth; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java new file mode 100644 index 0000000..68432cf --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Hidden; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.util.GcpProjectUtil; + +/** + * Properties needed when using Google CloudResourceManager with the Apache Beam SDK. + */ +@Description("Options that are used to configure Google CloudResourceManager. See " + + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.") +public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { + /** + * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage. + */ + @JsonIgnore + @Description("The GcpProjectUtil instance that should be used to communicate" + + " with Google Cloud Resource Manager.") + @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class) + @Hidden + GcpProjectUtil getGcpProjectUtil(); + void setGcpProjectUtil(GcpProjectUtil value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java new file mode 100644 index 0000000..09904b6 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import static com.google.common.base.Strings.isNullOrEmpty; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.auth.Credentials; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; +import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.DefaultBucket; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PathValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Options used to configure Google Cloud Platform specific options such as the project + * and credentials. + * + * <p>These options defer to the + * <a href="https://developers.google.com/accounts/docs/application-default-credentials"> + * application default credentials</a> for authentication. See the + * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for + * alternative mechanisms for creating credentials. + */ +@Description("Options used to configure Google Cloud Platform project and credentials.") +public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { + /** + * Project id to use when launching jobs. + */ + @Description("Project id. Required when using Google Cloud Platform services. " + + "See https://cloud.google.com/storage/docs/projects for further details.") + @Default.InstanceFactory(DefaultProjectFactory.class) + String getProject(); + void setProject(String value); + + /** + * GCP <a href="https://developers.google.com/compute/docs/zones" + * >availability zone</a> for operations. + * + * <p>Default is set on a per-service basis. + */ + @Description("GCP availability zone for running GCP operations. " + + "Default is up to the individual service.") + String getZone(); + void setZone(String value); + + /** + * The class of the credential factory that should be created and used to create + * credentials. If gcpCredential has not been set explicitly, an instance of this class will + * be constructed and used as a credential factory. + */ + @Description("The class of the credential factory that should be created and used to create " + + "credentials. If gcpCredential has not been set explicitly, an instance of this class will " + + "be constructed and used as a credential factory.") + @Default.Class(GcpCredentialFactory.class) + Class<? extends CredentialFactory> getCredentialFactoryClass(); + void setCredentialFactoryClass( + Class<? extends CredentialFactory> credentialFactoryClass); + + /** + * The credential instance that should be used to authenticate against GCP services. + * If no credential has been set explicitly, the default is to use the instance factory + * that constructs a credential based upon the currently set credentialFactoryClass. + */ + @JsonIgnore + @Description("The credential instance that should be used to authenticate against GCP services. " + + "If no credential has been set explicitly, the default is to use the instance factory " + + "that constructs a credential based upon the currently set credentialFactoryClass.") + @Default.InstanceFactory(GcpUserCredentialsFactory.class) + Credentials getGcpCredential(); + void setGcpCredential(Credentials value); + + /** + * Attempts to infer the default project based upon the environment this application + * is executing within. Currently this only supports getting the default project from gcloud. + */ + class DefaultProjectFactory implements DefaultValueFactory<String> { + private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class); + + @Override + public String create(PipelineOptions options) { + try { + File configFile; + if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) { + configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties"); + } else if (isWindows() && getEnvironment().containsKey("APPDATA")) { + configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties"); + } else { + // New versions of gcloud use this file + configFile = new File( + System.getProperty("user.home"), + ".config/gcloud/configurations/config_default"); + if (!configFile.exists()) { + // Old versions of gcloud use this file + configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties"); + } + } + String section = null; + Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$"); + Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$"); + for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) { + line = line.trim(); + if (line.isEmpty() || line.startsWith(";")) { + continue; + } + Matcher matcher = sectionPattern.matcher(line); + if (matcher.matches()) { + section = matcher.group(1); + } else if (section == null || section.equals("core")) { + matcher = projectPattern.matcher(line); + if (matcher.matches()) { + String project = matcher.group(1).trim(); + LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect " + + "project, please cancel this Pipeline and specify the command-line " + + "argument --project.", project); + return project; + } + } + } + } catch (IOException expected) { + LOG.debug("Failed to find default project.", expected); + } + // return null if can't determine + return null; + } + + /** + * Returns true if running on the Windows OS. + */ + private static boolean isWindows() { + return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows"); + } + + /** + * Used to mock out getting environment variables. + */ + @VisibleForTesting + Map<String, String> getEnvironment() { + return System.getenv(); + } + } + + /** + * Attempts to load the GCP credentials. See + * {@link CredentialFactory#getCredential()} for more details. + */ + class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> { + @Override + public Credentials create(PipelineOptions options) { + GcpOptions gcpOptions = options.as(GcpOptions.class); + try { + CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class) + .fromClass(gcpOptions.getCredentialFactoryClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + return factory.getCredential(); + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException("Unable to obtain credential", e); + } + } + } + + /** + * A GCS path for storing temporary files in GCP. + * + * <p>Its default to {@link PipelineOptions#getTempLocation}. + */ + @Description("A GCS path for storing temporary files in GCP.") + @Default.InstanceFactory(GcpTempLocationFactory.class) + @Nullable String getGcpTempLocation(); + void setGcpTempLocation(String value); + + /** + * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location. + */ + class GcpTempLocationFactory implements DefaultValueFactory<String> { + + @Override + @Nullable + public String create(PipelineOptions options) { + String tempLocation = options.getTempLocation(); + if (isNullOrEmpty(tempLocation)) { + tempLocation = DefaultBucket.tryCreateDefaultBucket(options); + options.setTempLocation(tempLocation); + } else { + try { + PathValidator validator = options.as(GcsOptions.class).getPathValidator(); + validator.validateOutputFilePrefixSupported(tempLocation); + } catch (Exception e) { + throw new IllegalArgumentException(String.format( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path, %s. ", tempLocation), e); + } + } + return tempLocation; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java new file mode 100644 index 0000000..afc3416 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpPipelineOptionsRegistrar.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.gcp.options; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; + +/** + * A registrar containing the default GCP options. + */ +@AutoService(PipelineOptionsRegistrar.class) +public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>builder() + .add(GcpOptions.class) + .add(GcsOptions.class) + .add(GoogleApiDebugOptions.class) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java new file mode 100644 index 0000000..954092c --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Hidden; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.GcsPathValidator; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PathValidator; + +/** + * Options used to configure Google Cloud Storage. + */ +public interface GcsOptions extends + ApplicationNameOptions, GcpOptions, PipelineOptions { + /** + * The GcsUtil instance that should be used to communicate with Google Cloud Storage. + */ + @JsonIgnore + @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.") + @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class) + @Hidden + GcsUtil getGcsUtil(); + void setGcsUtil(GcsUtil value); + + /** + * The ExecutorService instance to use to create threads, can be overridden to specify an + * ExecutorService that is compatible with the users environment. If unset, the + * default is to create an ExecutorService with an unbounded number of threads; this + * is compatible with Google AppEngine. + */ + @JsonIgnore + @Description("The ExecutorService instance to use to create multiple threads. Can be overridden " + + "to specify an ExecutorService that is compatible with the users environment. If unset, " + + "the default is to create an ExecutorService with an unbounded number of threads; this " + + "is compatible with Google AppEngine.") + @Default.InstanceFactory(ExecutorServiceFactory.class) + @Hidden + ExecutorService getExecutorService(); + void setExecutorService(ExecutorService value); + + /** + * GCS endpoint to use. If unspecified, uses the default endpoint. + */ + @JsonIgnore + @Hidden + @Description("The URL for the GCS API.") + String getGcsEndpoint(); + void setGcsEndpoint(String value); + + /** + * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for + * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the + * restrictions and performance implications of this value. + */ + @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the " + + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more " + + "information on the restrictions and performance implications of this value.\n\n" + + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" + + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") + @Nullable + Integer getGcsUploadBufferSizeBytes(); + void setGcsUploadBufferSizeBytes(@Nullable Integer bytes); + + /** + * The class of the validator that should be created and used to validate paths. + * If pathValidator has not been set explicitly, an instance of this class will be + * constructed and used as the path validator. + */ + @Description("The class of the validator that should be created and used to validate paths. " + + "If pathValidator has not been set explicitly, an instance of this class will be " + + "constructed and used as the path validator.") + @Default.Class(GcsPathValidator.class) + Class<? extends PathValidator> getPathValidatorClass(); + void setPathValidatorClass(Class<? extends PathValidator> validatorClass); + + /** + * The path validator instance that should be used to validate paths. + * If no path validator has been set explicitly, the default is to use the instance factory that + * constructs a path validator based upon the currently set pathValidatorClass. + */ + @JsonIgnore + @Description("The path validator instance that should be used to validate paths. " + + "If no path validator has been set explicitly, the default is to use the instance factory " + + "that constructs a path validator based upon the currently set pathValidatorClass.") + @Default.InstanceFactory(PathValidatorFactory.class) + PathValidator getPathValidator(); + void setPathValidator(PathValidator validator); + + /** + * Returns the default {@link ExecutorService} to use within the Apache Beam SDK. The + * {@link ExecutorService} is compatible with AppEngine. + */ + class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> { + @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. + @Override + public ExecutorService create(PipelineOptions options) { + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); + threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); + threadFactoryBuilder.setDaemon(true); + /* The SDK requires an unbounded thread pool because a step may create X writers + * each requiring their own thread to perform the writes otherwise a writer may + * block causing deadlock for the step because the writers buffer is full. + * Also, the MapTaskExecutor launches the steps in reverse order and completes + * them in forward order thus requiring enough threads so that each step's writers + * can be active. + */ + return new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. + Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. + new SynchronousQueue<Runnable>(), + threadFactoryBuilder.build()); + } + } + + /** + * Creates a {@link PathValidator} object using the class specified in + * {@link #getPathValidatorClass()}. + */ + class PathValidatorFactory implements DefaultValueFactory<PathValidator> { + @Override + public PathValidator create(PipelineOptions options) { + GcsOptions gcsOptions = options.as(GcsOptions.class); + return InstanceBuilder.ofType(PathValidator.class) + .fromClass(gcsOptions.getPathValidatorClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java new file mode 100644 index 0000000..01144c4 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GoogleApiDebugOptions.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.options; + +import com.google.api.client.googleapis.services.AbstractGoogleClient; +import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; +import com.google.api.client.googleapis.services.GoogleClientRequestInitializer; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * These options configure debug settings for Google API clients created within the Apache Beam SDK. + */ +public interface GoogleApiDebugOptions extends PipelineOptions { + /** + * This option enables tracing of API calls to Google services used within the Apache + * Beam SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...} + * </code> where the {@code ApiName} represents the request classes canonical name. The + * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported. + * Typically, "producer" is the right destination to use: this makes API traces available to the + * team offering the API. Note that by enabling this option, the contents of the requests to and + * from Google Cloud services will be made available to Google. For example, by specifying + * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available + * to Google, specifically to the Google Cloud Dataflow team. + */ + @Description("This option enables tracing of API calls to Google services used within the Apache " + + "Beam SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} " + + "where the ApiName represents the request classes canonical name. The TraceDestination is " + + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is " + + "the right destination to use: this makes API traces available to the team offering the " + + "API. Note that by enabling this option, the contents of the requests to and from " + + "Google Cloud services will be made available to Google. For example, by specifying " + + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to " + + "Google, specifically to the Google Cloud Dataflow team.") + GoogleApiTracer getGoogleApiTrace(); + void setGoogleApiTrace(GoogleApiTracer commands); + + /** + * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls. + */ + class GoogleApiTracer extends HashMap<String, String> + implements GoogleClientRequestInitializer { + /** + * Creates a {@link GoogleApiTracer} that sets the trace destination on all + * calls that match the given client type. + */ + public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) { + put(client.getClass().getCanonicalName(), traceDestination); + return this; + } + + /** + * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all + * calls that match for the given request type. + */ + public GoogleApiTracer addTraceFor( + AbstractGoogleClientRequest<?> request, String traceDestination) { + put(request.getClass().getCanonicalName(), traceDestination); + return this; + } + + @Override + public void initialize(AbstractGoogleClientRequest<?> request) throws IOException { + for (Map.Entry<String, String> entry : this.entrySet()) { + if (request.getClass().getCanonicalName().contains(entry.getKey())) { + request.set("$trace", entry.getValue()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java new file mode 100644 index 0000000..bc9646c --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines {@link org.apache.beam.sdk.options.PipelineOptions} for + * configuring pipeline execution for Google Cloud Platform components. + */ +package org.apache.beam.sdk.extensions.gcp.options; http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java deleted file mode 100644 index 13fdaf3..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import org.apache.beam.sdk.util.GcpProjectUtil; - -/** - * Properties needed when using Google CloudResourceManager with the Apache Beam SDK. - */ -@Description("Options that are used to configure Google CloudResourceManager. See " - + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.") -public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions, - PipelineOptions, StreamingOptions { - /** - * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage. - */ - @JsonIgnore - @Description("The GcpProjectUtil instance that should be used to communicate" - + " with Google Cloud Resource Manager.") - @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class) - @Hidden - GcpProjectUtil getGcpProjectUtil(); - void setGcpProjectUtil(GcpProjectUtil value); -} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java deleted file mode 100644 index d01406f..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import static com.google.common.base.Strings.isNullOrEmpty; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.auth.Credentials; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; -import java.util.Locale; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nullable; -import org.apache.beam.sdk.util.CredentialFactory; -import org.apache.beam.sdk.util.DefaultBucket; -import org.apache.beam.sdk.util.GcpCredentialFactory; -import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.PathValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Options used to configure Google Cloud Platform specific options such as the project - * and credentials. - * - * <p>These options defer to the - * <a href="https://developers.google.com/accounts/docs/application-default-credentials"> - * application default credentials</a> for authentication. See the - * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for - * alternative mechanisms for creating credentials. - */ -@Description("Options used to configure Google Cloud Platform project and credentials.") -public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { - /** - * Project id to use when launching jobs. - */ - @Description("Project id. Required when using Google Cloud Platform services. " - + "See https://cloud.google.com/storage/docs/projects for further details.") - @Default.InstanceFactory(DefaultProjectFactory.class) - String getProject(); - void setProject(String value); - - /** - * GCP <a href="https://developers.google.com/compute/docs/zones" - * >availability zone</a> for operations. - * - * <p>Default is set on a per-service basis. - */ - @Description("GCP availability zone for running GCP operations. " - + "Default is up to the individual service.") - String getZone(); - void setZone(String value); - - /** - * The class of the credential factory that should be created and used to create - * credentials. If gcpCredential has not been set explicitly, an instance of this class will - * be constructed and used as a credential factory. - */ - @Description("The class of the credential factory that should be created and used to create " - + "credentials. If gcpCredential has not been set explicitly, an instance of this class will " - + "be constructed and used as a credential factory.") - @Default.Class(GcpCredentialFactory.class) - Class<? extends CredentialFactory> getCredentialFactoryClass(); - void setCredentialFactoryClass( - Class<? extends CredentialFactory> credentialFactoryClass); - - /** - * The credential instance that should be used to authenticate against GCP services. - * If no credential has been set explicitly, the default is to use the instance factory - * that constructs a credential based upon the currently set credentialFactoryClass. - */ - @JsonIgnore - @Description("The credential instance that should be used to authenticate against GCP services. " - + "If no credential has been set explicitly, the default is to use the instance factory " - + "that constructs a credential based upon the currently set credentialFactoryClass.") - @Default.InstanceFactory(GcpUserCredentialsFactory.class) - Credentials getGcpCredential(); - void setGcpCredential(Credentials value); - - /** - * Attempts to infer the default project based upon the environment this application - * is executing within. Currently this only supports getting the default project from gcloud. - */ - class DefaultProjectFactory implements DefaultValueFactory<String> { - private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class); - - @Override - public String create(PipelineOptions options) { - try { - File configFile; - if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) { - configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties"); - } else if (isWindows() && getEnvironment().containsKey("APPDATA")) { - configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties"); - } else { - // New versions of gcloud use this file - configFile = new File( - System.getProperty("user.home"), - ".config/gcloud/configurations/config_default"); - if (!configFile.exists()) { - // Old versions of gcloud use this file - configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties"); - } - } - String section = null; - Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$"); - Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$"); - for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) { - line = line.trim(); - if (line.isEmpty() || line.startsWith(";")) { - continue; - } - Matcher matcher = sectionPattern.matcher(line); - if (matcher.matches()) { - section = matcher.group(1); - } else if (section == null || section.equals("core")) { - matcher = projectPattern.matcher(line); - if (matcher.matches()) { - String project = matcher.group(1).trim(); - LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect " - + "project, please cancel this Pipeline and specify the command-line " - + "argument --project.", project); - return project; - } - } - } - } catch (IOException expected) { - LOG.debug("Failed to find default project.", expected); - } - // return null if can't determine - return null; - } - - /** - * Returns true if running on the Windows OS. - */ - private static boolean isWindows() { - return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows"); - } - - /** - * Used to mock out getting environment variables. - */ - @VisibleForTesting - Map<String, String> getEnvironment() { - return System.getenv(); - } - } - - /** - * Attempts to load the GCP credentials. See - * {@link CredentialFactory#getCredential()} for more details. - */ - class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> { - @Override - public Credentials create(PipelineOptions options) { - GcpOptions gcpOptions = options.as(GcpOptions.class); - try { - CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class) - .fromClass(gcpOptions.getCredentialFactoryClass()) - .fromFactoryMethod("fromOptions") - .withArg(PipelineOptions.class, options) - .build(); - return factory.getCredential(); - } catch (IOException | GeneralSecurityException e) { - throw new RuntimeException("Unable to obtain credential", e); - } - } - } - - /** - * A GCS path for storing temporary files in GCP. - * - * <p>Its default to {@link PipelineOptions#getTempLocation}. - */ - @Description("A GCS path for storing temporary files in GCP.") - @Default.InstanceFactory(GcpTempLocationFactory.class) - @Nullable String getGcpTempLocation(); - void setGcpTempLocation(String value); - - /** - * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location. - */ - class GcpTempLocationFactory implements DefaultValueFactory<String> { - - @Override - @Nullable - public String create(PipelineOptions options) { - String tempLocation = options.getTempLocation(); - if (isNullOrEmpty(tempLocation)) { - tempLocation = DefaultBucket.tryCreateDefaultBucket(options); - options.setTempLocation(tempLocation); - } else { - try { - PathValidator validator = options.as(GcsOptions.class).getPathValidator(); - validator.validateOutputFilePrefixSupported(tempLocation); - } catch (Exception e) { - throw new IllegalArgumentException(String.format( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path, %s. ", tempLocation), e); - } - } - return tempLocation; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0aed801a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java deleted file mode 100644 index 411121c..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.options; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -/** - * A registrar containing the default GCP options. - */ -@AutoService(PipelineOptionsRegistrar.class) -public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar { - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>builder() - .add(GcpOptions.class) - .add(GcsOptions.class) - .add(GoogleApiDebugOptions.class) - .build(); - } -}
