Make TestPipeline slightly less DataflowPipelineRunner-centric ----Release Notes----
[] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115302769 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13a042ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13a042ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13a042ae Branch: refs/heads/master Commit: 13a042aed7d01a126a1ff7ecb66e723474191fe0 Parents: c0a814b Author: klk <[email protected]> Authored: Mon Feb 22 21:17:40 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:26 2016 -0800 ---------------------------------------------------------------------- .../dataflow/sdk/testing/TestPipeline.java | 69 ++++++++++++-------- .../dataflow/sdk/testing/TestPipelineTest.java | 18 +++-- 2 files changed, 53 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13a042ae/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java index 05b5bad..a05a778 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/TestPipeline.java @@ -19,22 +19,23 @@ package com.google.cloud.dataflow.sdk.testing; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.PipelineResult; import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; +import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.common.base.Optional; import com.google.common.collect.Iterators; import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Iterator; +import javax.annotation.Nullable; + /** * A creator of test pipelines that can be used inside of tests that can be * configured to run locally or against the live service. @@ -67,7 +68,6 @@ import java.util.Iterator; */ public class TestPipeline extends Pipeline { private static final String PROPERTY_DATAFLOW_OPTIONS = "dataflowOptions"; - private static final Logger LOG = LoggerFactory.getLogger(TestPipeline.class); private static final ObjectMapper MAPPER = new ObjectMapper(); /** @@ -77,28 +77,22 @@ public class TestPipeline extends Pipeline { * {@link Pipeline#run} to execute the pipeline and check the tests. */ public static TestPipeline create() { - if (isIntegrationTest()) { - TestDataflowPipelineOptions options = getPipelineOptions(); - LOG.info("Using passed in options: " + options); - options.setStableUniqueNames(CheckEnabled.ERROR); - return new TestPipeline(TestDataflowPipelineRunner.fromOptions(options), options); - } else { - DirectPipelineRunner directRunner = DirectPipelineRunner.createForTest(); - directRunner.getPipelineOptions().setAppName(getAppName()); - directRunner.getPipelineOptions().setStableUniqueNames(CheckEnabled.ERROR); - return new TestPipeline(directRunner, directRunner.getPipelineOptions()); - } + return fromOptions(testingPipelineOptions()); + } + + public static TestPipeline fromOptions(PipelineOptions options) { + return new TestPipeline(PipelineRunner.fromOptions(options), options); } /** - * Returns whether this test is running on the Cloud Dataflow service as described - * in {@link TestPipeline}. + * Returns whether a {@link TestPipeline} supports dynamic work rebalancing, and thus tests + * of dynamic work rebalancing are expected to pass. */ - public static boolean isIntegrationTest() { - return Boolean.parseBoolean(System.getProperty("runIntegrationTestOnService")); + public boolean supportsDynamicWorkRebalancing() { + return getRunner() instanceof DataflowPipelineRunner; } - TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) { + private TestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions options) { super(runner, options); } @@ -126,14 +120,28 @@ public class TestPipeline extends Pipeline { } /** - * Creates PipelineOptions for testing with a DataflowPipelineRunner. + * Creates {@link PipelineOptions} for testing. */ - public static TestDataflowPipelineOptions getPipelineOptions() { + public static PipelineOptions testingPipelineOptions() { try { - TestDataflowPipelineOptions options = PipelineOptionsFactory.fromArgs( - MAPPER.readValue(System.getProperty(PROPERTY_DATAFLOW_OPTIONS), String[].class)) - .as(TestDataflowPipelineOptions.class); - options.setAppName(getAppName()); + @Nullable String systemDataflowOptions = System.getProperty(PROPERTY_DATAFLOW_OPTIONS); + PipelineOptions options = + systemDataflowOptions == null + ? PipelineOptionsFactory.create() + : PipelineOptionsFactory.fromArgs( + MAPPER.readValue( + System.getProperty(PROPERTY_DATAFLOW_OPTIONS), String[].class)) + .as(PipelineOptions.class); + + options.as(ApplicationNameOptions.class).setAppName(getAppName()); + if (isIntegrationTest()) { + // TODO: adjust everyone's integration test frameworks to set the runner class via the + // pipeline options via PROPERTY_DATAFLOW_OPTIONS + options.setRunner(TestDataflowPipelineRunner.class); + } else { + options.as(GcpOptions.class).setGcpCredential(new TestCredential()); + } + options.setStableUniqueNames(CheckEnabled.ERROR); return options; } catch (IOException e) { throw new RuntimeException("Unable to instantiate test options from system property " @@ -141,6 +149,13 @@ public class TestPipeline extends Pipeline { } } + /** + * Returns whether a {@link TestPipeline} should be treated as an integration test. + */ + private static boolean isIntegrationTest() { + return Boolean.parseBoolean(System.getProperty("runIntegrationTestOnService")); + } + /** Returns the class + method name of the test, or a default name. */ private static String getAppName() { Optional<StackTraceElement> stackTraceElement = findCallersStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13a042ae/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java index d74ba6a..397920a 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java @@ -21,6 +21,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.GcpOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.fasterxml.jackson.databind.ObjectMapper; @@ -57,15 +61,17 @@ public class TestPipelineTest { "--diskSizeGb=2" }); System.getProperties().put("dataflowOptions", stringOptions); - TestDataflowPipelineOptions options = TestPipeline.getPipelineOptions(); + DataflowPipelineOptions options = + TestPipeline.testingPipelineOptions().as(DataflowPipelineOptions.class); assertEquals(DataflowPipelineRunner.class, options.getRunner()); assertThat(options.getJobName(), startsWith("testpipelinetest0testcreationofpipelineoptions-")); - assertEquals("testProject", options.getProject()); + assertEquals("testProject", options.as(GcpOptions.class).getProject()); assertEquals("testApiRootUrl", options.getApiRootUrl()); assertEquals("testDataflowEndpoint", options.getDataflowEndpoint()); assertEquals("testTempLocation", options.getTempLocation()); assertEquals("testServiceAccountName", options.getServiceAccountName()); - assertEquals("testServiceAccountKeyfile", options.getServiceAccountKeyfile()); + assertEquals( + "testServiceAccountKeyfile", options.as(GcpOptions.class).getServiceAccountKeyfile()); assertEquals("testZone", options.getZone()); assertEquals(2, options.getDiskSizeGb()); } @@ -75,11 +81,9 @@ public class TestPipelineTest { ObjectMapper mapper = new ObjectMapper(); String stringOptions = mapper.writeValueAsString(new String[]{}); System.getProperties().put("dataflowOptions", stringOptions); - TestDataflowPipelineOptions options = TestPipeline.getPipelineOptions(); - assertThat(options.getAppName(), startsWith( + PipelineOptions options = TestPipeline.testingPipelineOptions(); + assertThat(options.as(ApplicationNameOptions.class).getAppName(), startsWith( "TestPipelineTest-testCreationOfPipelineOptionsFromReallyVerboselyNamedTestCase")); - assertThat(options.getJobName(), startsWith( - "testpipelinetest0testcreationofpipelineoptionsfrom")); } @Test
