Repository: incubator-beam Updated Branches: refs/heads/master 402fb70c4 -> dc98211cc
Add CrashingRunner for use in TestPipeline CrashingRunner is a PipelineRunner that crashes on calls to run() with an IllegalArgumentException. As a runner is currently required to construct a Pipeline object, this allows removal of all Pipeline Runners from the core SDK while retaining tests that depend only on the graph construction behavior. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc98211c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc98211c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc98211c Branch: refs/heads/master Commit: dc98211ccf17e94afb03ba51992c731684f855fa Parents: 402fb70 Author: Thomas Groh <[email protected]> Authored: Wed May 18 13:37:13 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu May 19 18:44:49 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/testing/CrashingRunner.java | 72 +++++++++++++++++++ .../apache/beam/sdk/testing/TestPipeline.java | 10 ++- .../beam/sdk/testing/CrashingRunnerTest.java | 76 ++++++++++++++++++++ .../beam/sdk/testing/TestPipelineTest.java | 17 ++++- 4 files changed, 172 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java new file mode 100644 index 0000000..975facc --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CrashingRunner.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AggregatorRetrievalException; +import org.apache.beam.sdk.runners.AggregatorValues; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; + +/** + * A {@link PipelineRunner} that applies no overrides and throws an exception on calls to + * {@link Pipeline#run()}. For use in {@link TestPipeline} to construct but not execute pipelines. + */ +public class CrashingRunner extends PipelineRunner<PipelineResult>{ + + public static CrashingRunner fromOptions(PipelineOptions opts) { + return new CrashingRunner(); + } + + @Override + public PipelineResult run(Pipeline pipeline) { + throw new IllegalArgumentException(String.format("Cannot call #run(Pipeline) on an instance " + + "of %s. %s should only be used as the default to construct a Pipeline " + + "using %s, and cannot execute Pipelines. Instead, specify a %s " + + "by providing PipelineOptions in the environment variable '%s'.", + getClass().getSimpleName(), + getClass().getSimpleName(), + TestPipeline.class.getSimpleName(), + PipelineRunner.class.getSimpleName(), + TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS)); + } + + private static class TestPipelineResult implements PipelineResult { + private TestPipelineResult() { + // Should never be instantiated by the enclosing class + throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s", + getClass().getSimpleName())); + } + + @Override + public State getState() { + throw new UnsupportedOperationException(String.format("Forbidden to instantiate %s", + getClass().getSimpleName())); + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) + throws AggregatorRetrievalException { + throw new AssertionError(String.format("Forbidden to instantiate %s", + getClass().getSimpleName())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index a4921d5..4618e33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -84,7 +84,8 @@ import javax.annotation.Nullable; * containing the message from the {@link PAssert} that failed. */ public class TestPipeline extends Pipeline { - private static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; + static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; + static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner"; private static final ObjectMapper MAPPER = new ObjectMapper(); /** @@ -145,8 +146,13 @@ public class TestPipeline extends Pipeline { .as(TestPipelineOptions.class); options.as(ApplicationNameOptions.class).setAppName(getAppName()); - // If no options were specified, use a test credential object on all pipelines. + // If no options were specified, set some reasonable defaults if (Strings.isNullOrEmpty(beamTestPipelineOptions)) { + // If there are no provided options, check to see if a dummy runner should be used. + String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER); + if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) { + options.setRunner(CrashingRunner.class); + } options.as(GcpOptions.class).setGcpCredential(new TestCredential()); } options.setStableUniqueNames(CheckEnabled.ERROR); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java new file mode 100644 index 0000000..041a73a --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CrashingRunnerTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Create; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CrashingRunner}. + */ +@RunWith(JUnit4.class) +public class CrashingRunnerTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void fromOptionsCreatesInstance() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(CrashingRunner.class); + PipelineRunner<? extends PipelineResult> runner = PipelineRunner.fromOptions(opts); + + assertTrue("Should have created a CrashingRunner", runner instanceof CrashingRunner); + } + + @Test + public void applySucceeds() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(CrashingRunner.class); + + Pipeline p = Pipeline.create(opts); + p.apply(Create.of(1, 2, 3)); + } + + @Test + public void runThrows() { + PipelineOptions opts = PipelineOptionsFactory.create(); + opts.setRunner(CrashingRunner.class); + + Pipeline p = Pipeline.create(opts); + p.apply(Create.of(1, 2, 3)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot call #run"); + thrown.expectMessage(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc98211c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 8af4ff2..b741e2e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.testing; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -29,6 +29,7 @@ import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.transforms.Create; import com.fasterxml.jackson.databind.ObjectMapper; @@ -36,6 +37,7 @@ import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,6 +51,7 @@ import java.util.UUID; @RunWith(JUnit4.class) public class TestPipelineTest { @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void testCreationUsingDefaults() { @@ -139,6 +142,18 @@ public class TestPipelineTest { assertEquals(m2, newOpts.getOnSuccessMatcher()); } + @Test + public void testRunWithDummyEnvironmentVariableFails() { + System.getProperties() + .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true)); + TestPipeline pipeline = TestPipeline.create(); + pipeline.apply(Create.of(1, 2, 3)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot call #run"); + pipeline.run(); + } + /** * TestMatcher is a matcher designed for testing matcher serialization/deserialization. */
