Finish removing DirectPipelineRunner references
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ebc8abe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ebc8abe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ebc8abe Branch: refs/heads/master Commit: 6ebc8abe83b2417df9abe11c691a4f6cec4e66b6 Parents: 47dd61a Author: Thomas Groh <tg...@google.com> Authored: Fri Jun 17 13:22:26 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri Jun 17 15:11:01 2016 -0700 ---------------------------------------------------------------------- .../examples/common/DataflowExampleUtils.java | 4 +- .../examples/cookbook/DatastoreWordCount.java | 4 +- .../translation/TransformTranslatorTest.java | 2 +- .../java/org/apache/beam/sdk/io/PubsubIO.java | 2 +- .../java/org/apache/beam/sdk/io/TextIO.java | 2 +- .../beam/sdk/options/DirectPipelineOptions.java | 74 -------------------- .../beam/sdk/options/PipelineOptions.java | 7 +- .../beam/sdk/util/BigQueryTableRowIterator.java | 2 +- .../apache/beam/sdk/util/DoFnRunnerBase.java | 2 +- .../beam/sdk/runners/PipelineRunnerTest.java | 24 ++++--- .../main/java/common/DataflowExampleUtils.java | 2 +- .../src/main/java/StarterPipeline.java | 2 +- .../src/main/java/it/pkg/StarterPipeline.java | 2 +- 13 files changed, 28 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java index 5b1af6d..46b8af3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java @@ -315,7 +315,7 @@ public class DataflowExampleUtils { } /** - * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with + * Do some runner setup: check that the DirectRunner is not used in conjunction with * streaming, and if streaming is specified, use the DataflowPipelineRunner. */ public void setupRunner() { @@ -413,7 +413,7 @@ public class DataflowExampleUtils { } } else { // Do nothing if the given PipelineResult doesn't support waitToFinish(), - // such as EvaluationResults returned by DirectPipelineRunner. + // such as EvaluationResults returned by DirectRunner. tearDown(); printPendingMessages(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java index bfaecdf..2d1f88c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java @@ -193,7 +193,7 @@ public class DatastoreWordCount { /** * An example that creates a pipeline to populate DatastoreIO from a - * text input. Forces use of DirectPipelineRunner for local execution mode. + * text input. Forces use of DirectRunner for local execution mode. */ public static void writeDataToDatastore(Options options) { Pipeline p = Pipeline.create(options); @@ -247,7 +247,7 @@ public class DatastoreWordCount { /** * An example to demo how to use {@link DatastoreIO}. The runner here is - * customizable, which means users could pass either {@code DirectPipelineRunner} + * customizable, which means users could pass either {@code DirectRunner} * or {@code DataflowPipelineRunner} in the pipeline options. */ public static void main(String args[]) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index 5fdfb49..b593316 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -53,7 +53,7 @@ public class TransformTranslatorTest { /** * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline - * in DirectPipelineRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark + * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark * transforms. Finally it makes sure that the results are the same for both runs. */ @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 2a5698c..c6de8b4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -364,7 +364,7 @@ public class PubsubIO { * the stream. * * <p>When running with a {@link PipelineRunner} that only supports bounded - * {@link PCollection PCollections} (such as {@link DirectPipelineRunner}), + * {@link PCollection PCollections} (such as {@link DirectRunner}), * only a bounded portion of the input Pub/Sub stream can be processed. As such, either * {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index bbef072..dc50a8c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -111,7 +111,7 @@ import javax.annotation.Nullable; * }</pre> * * <h3>Permissions</h3> - * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files + * <p>When run using the {@link DirectRunner}, your pipeline can read and write text files * on your local drive and remote text files on Google Cloud Storage that you have access to using * your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only * read and write files from GCS. For more information about permissions, see the Cloud Dataflow http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java deleted file mode 100644 index c2095e3..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.options; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.values.PCollection; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Options that can be used to configure the {@link DirectPipelineRunner}. - */ -public interface DirectPipelineOptions - extends ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, PipelineOptions, - StreamingOptions { - - /** - * The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}. - * If not explicitly specified, a random seed will be generated. - */ - @JsonIgnore - @Description("The random seed to use for pseudorandom behaviors in the DirectPipelineRunner." - + " If not explicitly specified, a random seed will be generated.") - Long getDirectPipelineRunnerRandomSeed(); - void setDirectPipelineRunnerRandomSeed(Long value); - - /** - * Controls whether the runner should ensure that all of the elements of - * the pipeline, such as DoFns, can be serialized. - */ - @JsonIgnore - @Description("Controls whether the runner should ensure that all of the elements of the " - + "pipeline, such as DoFns, can be serialized.") - @Default.Boolean(true) - boolean isTestSerializability(); - void setTestSerializability(boolean testSerializability); - - /** - * Controls whether the runner should ensure that all of the elements of - * every {@link PCollection} can be encoded using the appropriate - * {@link Coder}. - */ - @JsonIgnore - @Description("Controls whether the runner should ensure that all of the elements of every " - + "PCollection can be encoded using the appropriate Coder.") - @Default.Boolean(true) - boolean isTestEncodability(); - void setTestEncodability(boolean testEncodability); - - /** - * Controls whether the runner should randomize the order of each - * {@link PCollection}. - */ - @JsonIgnore - @Description("Controls whether the runner should randomize the order of each PCollection.") - @Default.Boolean(true) - boolean isTestUnorderedness(); - void setTestUnorderedness(boolean testUnorderedness); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 456b6ae..e89e5ad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -70,10 +70,9 @@ import javax.annotation.concurrent.ThreadSafe; * p.run(); * } * - * // To create options for the DirectPipeline: - * DirectPipelineOptions directPipelineOptions = - * PipelineOptionsFactory.as(DirectPipelineOptions.class); - * directPipelineOptions.setStreaming(true); + * // To create options for the DirectRunner: + * DirectOptions directRunnerOptions = + * PipelineOptionsFactory.as(DirectOptions.class); * * // To cast from one type to another using the as(Class) method: * DataflowPipelineOptions dataflowPipelineOptions = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java index ca1ac69..ad41a3f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java @@ -322,7 +322,7 @@ public class BigQueryTableRowIterator implements AutoCloseable { if (convertedValue == null) { // BigQuery does not include null values when the export operation (to JSON) is used. - // To match that behavior, BigQueryTableRowiterator, and the DirectPipelineRunner, + // To match that behavior, BigQueryTableRowiterator, and the DirectRunner, // intentionally omits columns with null values. continue; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 58b10a7..1ebe72b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -91,7 +91,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu /** * An implementation of {@code OutputManager} using simple lists, for testing and in-memory - * contexts such as the {@link DirectPipelineRunner}. + * contexts such as the {@link DirectRunner}. */ public static class ListOutputManager implements OutputManager { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java index 5d2e69d..fb8bb72 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java @@ -20,7 +20,9 @@ package org.apache.beam.sdk.runners; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.DirectPipelineOptions; +import org.apache.beam.sdk.options.GcpOptions; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.util.GcsUtil; @@ -50,12 +52,12 @@ public class PipelineRunnerTest { @Test public void testLongName() { // Check we can create a pipeline runner using the full class name. - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setAppName("test"); - options.setProject("test"); - options.setGcsUtil(mockGcsUtil); + PipelineOptions options = PipelineOptionsFactory.create(); + options.as(ApplicationNameOptions.class).setAppName("test"); + options.as(GcpOptions.class).setProject("test"); + options.as(GcsOptions.class).setGcsUtil(mockGcsUtil); options.setRunner(CrashingRunner.class); - options.setGcpCredential(new TestCredential()); + options.as(GcpOptions.class).setGcpCredential(new TestCredential()); PipelineRunner<?> runner = PipelineRunner.fromOptions(options); assertTrue(runner instanceof CrashingRunner); } @@ -63,12 +65,12 @@ public class PipelineRunnerTest { @Test public void testShortName() { // Check we can create a pipeline runner using the short class name. - DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); - options.setAppName("test"); - options.setProject("test"); - options.setGcsUtil(mockGcsUtil); + PipelineOptions options = PipelineOptionsFactory.create(); + options.as(ApplicationNameOptions.class).setAppName("test"); + options.as(GcpOptions.class).setProject("test"); + options.as(GcsOptions.class).setGcsUtil(mockGcsUtil); options.setRunner(CrashingRunner.class); - options.setGcpCredential(new TestCredential()); + options.as(GcpOptions.class).setGcpCredential(new TestCredential()); PipelineRunner<?> runner = PipelineRunner.fromOptions(options); assertTrue(runner instanceof CrashingRunner); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java index 76df4d4..6ec4540 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java @@ -315,7 +315,7 @@ public class DataflowExampleUtils { } } else { // Do nothing if the given PipelineResult doesn't support waitToFinish(), - // such as EvaluationResults returned by DirectPipelineRunner. + // such as EvaluationResults returned by DirectRunner. } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java index 2146b77..027431f 100644 --- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; * <p>The example takes two strings, converts them to their upper-case * representation and logs them. * - * <p>To run this starter example locally using DirectPipelineRunner, just + * <p>To run this starter example locally using DirectRunner, just * execute it without any additional parameters from your favorite development * environment. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ebc8abe/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java index 6cd27e7..bb86b0d 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; * <p>The example takes two strings, converts them to their upper-case * representation and logs them. * - * <p>To run this starter example locally using DirectPipelineRunner, just + * <p>To run this starter example locally using DirectRunner, just * execute it without any additional parameters from your favorite development * environment. *