Register TestSparkPipelineOptions only in src/test to avoid hard hamcrest dep
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95d33c52 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95d33c52 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95d33c52 Branch: refs/heads/master Commit: 95d33c521788ce8046201a6baf22e46950560cf1 Parents: c4adbd3 Author: Kenneth Knowles <[email protected]> Authored: Tue May 9 12:44:19 2017 -0700 Committer: Davor Bonaci <[email protected]> Committed: Tue May 9 17:06:57 2017 -0700 ---------------------------------------------------------------------- .../runners/spark/SparkRunnerRegistrar.java | 4 +-- .../beam/runners/spark/TestSparkRunner.java | 29 ++++++++-------- .../runners/spark/SparkRunnerRegistrarTest.java | 2 +- .../TestSparkPipelineOptionsRegistrar.java | 36 ++++++++++++++++++++ 4 files changed, 52 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index e2e5ceb..325c86d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -54,9 +54,7 @@ public final class SparkRunnerRegistrar { public static class Options implements PipelineOptionsRegistrar { @Override public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of( - SparkPipelineOptions.class, - TestSparkPipelineOptions.class); + return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 6d10b75..eccee57 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -83,26 +83,25 @@ import org.slf4j.LoggerFactory; public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class); - private final TestSparkPipelineOptions testSparkPipelineOptions; - + private final PipelineOptions options; private SparkRunner delegate; - private boolean isForceStreaming; - private TestSparkRunner(TestSparkPipelineOptions options) { + private TestSparkRunner(PipelineOptions options) { this.delegate = SparkRunner.fromOptions(options); - this.isForceStreaming = options.isForceStreaming(); - this.testSparkPipelineOptions = options; + this.options = options; } public static TestSparkRunner fromOptions(PipelineOptions options) { - // Default options suffice to set it up as a test runner - TestSparkPipelineOptions sparkOptions = - PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options); - return new TestSparkRunner(sparkOptions); + return new TestSparkRunner(options); } @Override public SparkPipelineResult run(Pipeline pipeline) { + // Default options suffice to set it up as a test runner + TestSparkPipelineOptions testSparkOptions = + PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options); + + boolean isForceStreaming = testSparkOptions.isForceStreaming(); // if the pipeline forces execution as a streaming pipeline, // and the source is an adapted unbounded source (as bounded), // read it as unbounded source via UnboundedReadFromBoundedSource. @@ -116,13 +115,13 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { MetricsAccumulator.clear(); GlobalWatermarkHolder.clear(); - LOG.info("About to run test pipeline " + testSparkPipelineOptions.getJobName()); + LOG.info("About to run test pipeline " + options.getJobName()); // if the pipeline was executed in streaming mode, validate aggregators. if (isForceStreaming) { try { result = delegate.run(pipeline); - awaitWatermarksOrTimeout(testSparkPipelineOptions, result); + awaitWatermarksOrTimeout(testSparkOptions, result); result.stop(); PipelineResult.State finishState = result.getState(); // assert finish state. @@ -133,7 +132,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { } finally { try { // cleanup checkpoint dir. - FileUtils.deleteDirectory(new File(testSparkPipelineOptions.getCheckpointDir())); + FileUtils.deleteDirectory(new File(testSparkOptions.getCheckpointDir())); } catch (IOException e) { throw new RuntimeException("Failed to clear checkpoint tmp dir.", e); } @@ -150,8 +149,8 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { finishState, is(PipelineResult.State.DONE)); // assert via matchers. - assertThat(result, testSparkPipelineOptions.getOnCreateMatcher()); - assertThat(result, testSparkPipelineOptions.getOnSuccessMatcher()); + assertThat(result, testSparkOptions.getOnCreateMatcher()); + assertThat(result, testSparkOptions.getOnSuccessMatcher()); } return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index 75899f9..4e1fd7c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -38,7 +38,7 @@ public class SparkRunnerRegistrarTest { @Test public void testOptions() { assertEquals( - ImmutableList.of(SparkPipelineOptions.class, TestSparkPipelineOptions.class), + ImmutableList.of(SparkPipelineOptions.class), new SparkRunnerRegistrar.Options().getPipelineOptions()); } http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java new file mode 100644 index 0000000..e71880b --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; + +/** + * A registrar for {@link TestSparkPipelineOptions} to temporarily work around some complexities in + * {@link PipelineOptions} parsing. + */ +@AutoService(PipelineOptionsRegistrar.class) +public final class TestSparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(TestSparkPipelineOptions.class); + } +}
