Repository: incubator-beam Updated Branches: refs/heads/master 0e4d0a9ae -> d9cdcadf5
Remove Pipeline from TestDataflowPipelineRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/380c1a83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/380c1a83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/380c1a83 Branch: refs/heads/master Commit: 380c1a83a82aa8b151d309cba82df3e2fb9398ec Parents: 0e4d0a9 Author: Thomas Groh <[email protected]> Authored: Fri Jun 17 16:36:22 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Jun 20 09:27:48 2016 -0700 ---------------------------------------------------------------------- examples/java/README.md | 4 +- .../beam/runners/flink/examples/TFIDF.java | 2 +- .../testing/TestDataflowPipelineRunner.java | 271 ------------------- .../dataflow/testing/TestDataflowRunner.java | 271 +++++++++++++++++++ .../testing/TestDataflowRunnerTest.java | 40 +-- 5 files changed, 294 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/examples/java/README.md ---------------------------------------------------------------------- diff --git a/examples/java/README.md b/examples/java/README.md index ef3cf07..2b5edf5 100644 --- a/examples/java/README.md +++ b/examples/java/README.md @@ -64,7 +64,7 @@ the same pipeline on fully managed resources in Google Cloud Platform: -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=<YOUR CLOUD PLATFORM PROJECT ID> \ --tempLocation=<YOUR CLOUD STORAGE LOCATION> \ - --runner=BlockingDataflowPipelineRunner" + --runner=BlockingDataflowRunner" Make sure to use your project id, not the project number or the descriptive name. The Cloud Storage location should be entered in the form of @@ -86,7 +86,7 @@ Platform: org.apache.beam.examples.WordCount \ --project=<YOUR CLOUD PLATFORM PROJECT ID> \ --tempLocation=<YOUR CLOUD STORAGE LOCATION> \ - --runner=BlockingDataflowPipelineRunner + --runner=BlockingDataflowRunner Other examples can be run similarly by replacing the `WordCount` class path with the example classpath, e.g. `org.apache.beam.examples.cookbook.BigQueryTornadoes`, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 0afde0a..876ecde 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -82,7 +82,7 @@ import java.util.Set; * <pre>{@code * --project=YOUR_PROJECT_ID * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner + * --runner=BlockingDataflowRunner * and an output prefix on GCS: * --output=gs://YOUR_OUTPUT_PREFIX * }</pre> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java deleted file mode 100644 index f83a139..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.testing; - -import static org.hamcrest.MatcherAssert.assertThat; - -import org.apache.beam.runners.dataflow.DataflowJobExecutionException; -import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -import com.google.api.services.dataflow.model.JobMessage; -import com.google.api.services.dataflow.model.JobMetrics; -import com.google.api.services.dataflow.model.MetricUpdate; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -/** - * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a - * {@link DataflowRunner} when running tests against the {@link TestPipeline}. - * - * @see TestPipeline - */ -public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> { - private static final String TENTATIVE_COUNTER = "tentative"; - private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class); - - private final TestDataflowPipelineOptions options; - private final DataflowRunner runner; - private int expectedNumberOfAssertions = 0; - - TestDataflowPipelineRunner(TestDataflowPipelineOptions options) { - this.options = options; - this.runner = DataflowRunner.fromOptions(options); - } - - /** - * Constructs a runner from the provided options. - */ - public static TestDataflowPipelineRunner fromOptions( - PipelineOptions options) { - TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class); - dataflowOptions.setStagingLocation(Joiner.on("/").join( - new String[]{dataflowOptions.getTempRoot(), - dataflowOptions.getJobName(), "output", "results"})); - - return new TestDataflowPipelineRunner(dataflowOptions); - } - - @Override - public DataflowPipelineJob run(Pipeline pipeline) { - return run(pipeline, runner); - } - - DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { - - TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); - final DataflowPipelineJob job; - try { - job = runner.run(pipeline); - } catch (DataflowJobExecutionException ex) { - throw new IllegalStateException("The dataflow failed."); - } - - LOG.info("Running Dataflow job {} with {} expected assertions.", - job.getJobId(), expectedNumberOfAssertions); - - assertThat(job, testPipelineOptions.getOnCreateMatcher()); - - CancelWorkflowOnError messageHandler = new CancelWorkflowOnError( - job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput())); - - try { - final Optional<Boolean> result; - - if (options.isStreaming()) { - Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit( - new Callable<Optional<Boolean>>() { - @Override - public Optional<Boolean> call() throws Exception { - try { - for (;;) { - Optional<Boolean> result = checkForSuccess(job); - if (result.isPresent()) { - return result; - } - Thread.sleep(10000L); - } - } finally { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - job.cancel(); - } - } - }); - State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler); - if (finalState == null || finalState == State.RUNNING) { - LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.", - job.getJobId()); - job.cancel(); - } - result = resultFuture.get(); - } else { - job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler); - result = checkForSuccess(job); - } - if (!result.isPresent()) { - throw new IllegalStateException( - "The dataflow did not output a success or failure metric."); - } else if (!result.get()) { - throw new AssertionError(messageHandler.getErrorMessage() == null - ? "The dataflow did not return a failure reason." - : messageHandler.getErrorMessage()); - } else { - assertThat(job, testPipelineOptions.getOnSuccessMatcher()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - Throwables.propagateIfPossible(e.getCause()); - throw new RuntimeException(e.getCause()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return job; - } - - @Override - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - if (transform instanceof PAssert.OneSideInputAssert - || transform instanceof PAssert.GroupThenAssert - || transform instanceof PAssert.GroupThenAssertForSingleton) { - expectedNumberOfAssertions += 1; - } - - return runner.apply(transform, input); - } - - Optional<Boolean> checkForSuccess(DataflowPipelineJob job) - throws IOException { - State state = job.getState(); - if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("The pipeline failed"); - return Optional.of(false); - } - - JobMetrics metrics = job.getDataflowClient().projects().jobs() - .getMetrics(job.getProjectId(), job.getJobId()).execute(); - - if (metrics == null || metrics.getMetrics() == null) { - LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); - } else { - int successes = 0; - int failures = 0; - for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null || metric.getName().getContext() == null - || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { - // Don't double count using the non-tentative version of the metric. - continue; - } - if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) { - successes += ((BigDecimal) metric.getScalar()).intValue(); - } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) { - failures += ((BigDecimal) metric.getScalar()).intValue(); - } - } - - if (failures > 0) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, - expectedNumberOfAssertions); - return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, - expectedNumberOfAssertions); - return Optional.of(true); - } - - LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected " - + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); - } - - return Optional.<Boolean>absent(); - } - - @Override - public String toString() { - return "TestDataflowPipelineRunner#" + options.getAppName(); - } - - /** - * Cancels the workflow on the first error message it sees. - * - * <p>Creates an error message representing the concatenation of all error messages seen. - */ - private static class CancelWorkflowOnError implements JobMessagesHandler { - private final DataflowPipelineJob job; - private final JobMessagesHandler messageHandler; - private final StringBuffer errorMessage; - private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) { - this.job = job; - this.messageHandler = messageHandler; - this.errorMessage = new StringBuffer(); - } - - @Override - public void process(List<JobMessage> messages) { - messageHandler.process(messages); - for (JobMessage message : messages) { - if (message.getMessageImportance() != null - && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) { - LOG.info("Dataflow job {} threw exception. Failure message was: {}", - job.getJobId(), message.getMessageText()); - errorMessage.append(message.getMessageText()); - } - } - if (errorMessage.length() > 0) { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - try { - job.cancel(); - } catch (Exception ignore) { - // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure - // messages. - } - } - } - - private String getErrorMessage() { - return errorMessage.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java new file mode 100644 index 0000000..19a2178 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.testing; + +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.beam.runners.dataflow.DataflowJobExecutionException; +import org.apache.beam.runners.dataflow.DataflowPipelineJob; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * {@link TestDataflowRunner} is a pipeline runner that wraps a + * {@link DataflowRunner} when running tests against the {@link TestPipeline}. + * + * @see TestPipeline + */ +public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { + private static final String TENTATIVE_COUNTER = "tentative"; + private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); + + private final TestDataflowPipelineOptions options; + private final DataflowRunner runner; + private int expectedNumberOfAssertions = 0; + + TestDataflowRunner(TestDataflowPipelineOptions options) { + this.options = options; + this.runner = DataflowRunner.fromOptions(options); + } + + /** + * Constructs a runner from the provided options. + */ + public static TestDataflowRunner fromOptions( + PipelineOptions options) { + TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class); + dataflowOptions.setStagingLocation(Joiner.on("/").join( + new String[]{dataflowOptions.getTempRoot(), + dataflowOptions.getJobName(), "output", "results"})); + + return new TestDataflowRunner(dataflowOptions); + } + + @Override + public DataflowPipelineJob run(Pipeline pipeline) { + return run(pipeline, runner); + } + + DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { + + TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); + final DataflowPipelineJob job; + try { + job = runner.run(pipeline); + } catch (DataflowJobExecutionException ex) { + throw new IllegalStateException("The dataflow failed."); + } + + LOG.info("Running Dataflow job {} with {} expected assertions.", + job.getJobId(), expectedNumberOfAssertions); + + assertThat(job, testPipelineOptions.getOnCreateMatcher()); + + CancelWorkflowOnError messageHandler = new CancelWorkflowOnError( + job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput())); + + try { + final Optional<Boolean> result; + + if (options.isStreaming()) { + Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit( + new Callable<Optional<Boolean>>() { + @Override + public Optional<Boolean> call() throws Exception { + try { + for (;;) { + Optional<Boolean> result = checkForSuccess(job); + if (result.isPresent()) { + return result; + } + Thread.sleep(10000L); + } + } finally { + LOG.info("Cancelling Dataflow job {}", job.getJobId()); + job.cancel(); + } + } + }); + State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler); + if (finalState == null || finalState == State.RUNNING) { + LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.", + job.getJobId()); + job.cancel(); + } + result = resultFuture.get(); + } else { + job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler); + result = checkForSuccess(job); + } + if (!result.isPresent()) { + throw new IllegalStateException( + "The dataflow did not output a success or failure metric."); + } else if (!result.get()) { + throw new AssertionError(messageHandler.getErrorMessage() == null + ? "The dataflow did not return a failure reason." + : messageHandler.getErrorMessage()); + } else { + assertThat(job, testPipelineOptions.getOnSuccessMatcher()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwables.propagateIfPossible(e.getCause()); + throw new RuntimeException(e.getCause()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return job; + } + + @Override + public <OutputT extends POutput, InputT extends PInput> OutputT apply( + PTransform<InputT, OutputT> transform, InputT input) { + if (transform instanceof PAssert.OneSideInputAssert + || transform instanceof PAssert.GroupThenAssert + || transform instanceof PAssert.GroupThenAssertForSingleton) { + expectedNumberOfAssertions += 1; + } + + return runner.apply(transform, input); + } + + Optional<Boolean> checkForSuccess(DataflowPipelineJob job) + throws IOException { + State state = job.getState(); + if (state == State.FAILED || state == State.CANCELLED) { + LOG.info("The pipeline failed"); + return Optional.of(false); + } + + JobMetrics metrics = job.getDataflowClient().projects().jobs() + .getMetrics(job.getProjectId(), job.getJobId()).execute(); + + if (metrics == null || metrics.getMetrics() == null) { + LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); + } else { + int successes = 0; + int failures = 0; + for (MetricUpdate metric : metrics.getMetrics()) { + if (metric.getName() == null || metric.getName().getContext() == null + || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { + // Don't double count using the non-tentative version of the metric. + continue; + } + if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) { + successes += ((BigDecimal) metric.getScalar()).intValue(); + } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) { + failures += ((BigDecimal) metric.getScalar()).intValue(); + } + } + + if (failures > 0) { + LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + + "{} expected assertions.", job.getJobId(), successes, failures, + expectedNumberOfAssertions); + return Optional.of(false); + } else if (successes >= expectedNumberOfAssertions) { + LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + + "{} expected assertions.", job.getJobId(), successes, failures, + expectedNumberOfAssertions); + return Optional.of(true); + } + + LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected " + + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); + } + + return Optional.<Boolean>absent(); + } + + @Override + public String toString() { + return "TestDataflowRunner#" + options.getAppName(); + } + + /** + * Cancels the workflow on the first error message it sees. + * + * <p>Creates an error message representing the concatenation of all error messages seen. + */ + private static class CancelWorkflowOnError implements JobMessagesHandler { + private final DataflowPipelineJob job; + private final JobMessagesHandler messageHandler; + private final StringBuffer errorMessage; + private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) { + this.job = job; + this.messageHandler = messageHandler; + this.errorMessage = new StringBuffer(); + } + + @Override + public void process(List<JobMessage> messages) { + messageHandler.process(messages); + for (JobMessage message : messages) { + if (message.getMessageImportance() != null + && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) { + LOG.info("Dataflow job {} threw exception. Failure message was: {}", + job.getJobId(), message.getMessageText()); + errorMessage.append(message.getMessageText()); + } + } + if (errorMessage.length() > 0) { + LOG.info("Cancelling Dataflow job {}", job.getJobId()); + try { + job.cancel(); + } catch (Exception ignore) { + // The TestDataflowRunner will thrown an AssertionError with the job failure + // messages. + } + } + } + + private String getErrorMessage() { + return errorMessage.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/380c1a83/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 4067f08..cd99643 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -85,7 +85,7 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.concurrent.TimeUnit; -/** Tests for {@link TestDataflowPipelineRunner}. */ +/** Tests for {@link TestDataflowRunner}. */ @RunWith(JUnit4.class) public class TestDataflowRunnerTest { @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -110,14 +110,14 @@ public class TestDataflowRunnerTest { options.setTempRoot("gs://test"); options.setGcpCredential(new TestCredential()); options.setDataflowClient(service); - options.setRunner(TestDataflowPipelineRunner.class); + options.setRunner(TestDataflowRunner.class); options.setPathValidatorClass(NoopPathValidator.class); } @Test public void testToString() { - assertEquals("TestDataflowPipelineRunner#TestAppName", - new TestDataflowPipelineRunner(options).toString()); + assertEquals("TestDataflowRunner#TestAppName", + new TestDataflowRunner(options).toString()); } @Test @@ -135,7 +135,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); when(request.execute()).thenReturn( generateMockMetricResponse(true /* success */, true /* tentative */)); assertEquals(mockJob, runner.run(p, mockRunner)); @@ -156,7 +156,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -197,7 +197,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn( generateMockMetricResponse(false /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -228,7 +228,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn( generateMockMetricResponse(true /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); runner.run(p, mockRunner); } @@ -250,7 +250,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn( generateMockMetricResponse(false /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -269,7 +269,7 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); when(request.execute()).thenReturn( generateMockMetricResponse(true /* success */, true /* tentative */)); doReturn(State.DONE).when(job).getState(); @@ -284,7 +284,7 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); when(request.execute()).thenReturn( generateMockMetricResponse(false /* success */, true /* tentative */)); doReturn(State.DONE).when(job).getState(); @@ -299,7 +299,7 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); when(request.execute()).thenReturn( generateMockMetricResponse(true /* success */, false /* tentative */)); doReturn(State.RUNNING).when(job).getState(); @@ -335,7 +335,7 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); when(request.execute()).thenReturn( generateMockMetricResponse(true /* success */, false /* tentative */)); doReturn(State.FAILED).when(job).getState(); @@ -373,7 +373,7 @@ public class TestDataflowRunnerTest { when(request.execute()).thenReturn( generateMockMetricResponse(false /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -401,7 +401,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); @@ -426,7 +426,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); @@ -453,7 +453,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); @@ -478,7 +478,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); @@ -505,7 +505,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); @@ -537,7 +537,7 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher());
