Move TestDataflowRunner into dataflow package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d305d6d3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d305d6d3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d305d6d3 Branch: refs/heads/release-2.0.0 Commit: d305d6d382179c2eadd77dac3c603d73f65c80e4 Parents: a71f6cd Author: Kenneth Knowles <[email protected]> Authored: Sat May 6 05:22:03 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Sun May 7 16:13:11 2017 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineRegistrar.java | 1 - .../dataflow/TestDataflowPipelineOptions.java | 28 + .../runners/dataflow/TestDataflowRunner.java | 322 +++++++++ .../testing/TestDataflowPipelineOptions.java | 28 - .../dataflow/testing/TestDataflowRunner.java | 325 --------- .../runners/dataflow/testing/package-info.java | 24 - .../runners/dataflow/DataflowMetricsTest.java | 1 - .../dataflow/DataflowPipelineJobTest.java | 1 - .../dataflow/TestDataflowRunnerTest.java | 652 ++++++++++++++++++ .../testing/TestDataflowRunnerTest.java | 655 ------------------- 10 files changed, 1002 insertions(+), 1035 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java index f36930f..15855f9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.testing.TestDataflowRunner; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsRegistrar; http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java new file mode 100644 index 0000000..a8acc76 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; + +/** + * A set of options used to configure the {@link TestPipeline}. + */ +public interface TestDataflowPipelineOptions extends TestPipelineOptions, DataflowPipelineOptions { +} http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java new file mode 100644 index 0000000..b81b487 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link TestDataflowRunner} is a pipeline runner that wraps a + * {@link DataflowRunner} when running tests against the {@link TestPipeline}. + * + * @see TestPipeline + */ +public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { + private static final String TENTATIVE_COUNTER = "tentative"; + private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); + + private final TestDataflowPipelineOptions options; + private final DataflowClient dataflowClient; + private final DataflowRunner runner; + private int expectedNumberOfAssertions = 0; + + TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) { + this.options = options; + this.dataflowClient = client; + this.runner = DataflowRunner.fromOptions(options); + } + + /** + * Constructs a runner from the provided options. + */ + public static TestDataflowRunner fromOptions(PipelineOptions options) { + TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class); + String tempLocation = Joiner.on("/").join( + dataflowOptions.getTempRoot(), + dataflowOptions.getJobName(), + "output", + "results"); + dataflowOptions.setTempLocation(tempLocation); + + return new TestDataflowRunner( + dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class))); + } + + @VisibleForTesting + static TestDataflowRunner fromOptionsAndClient( + TestDataflowPipelineOptions options, DataflowClient client) { + return new TestDataflowRunner(options, client); + } + + @Override + public DataflowPipelineJob run(Pipeline pipeline) { + return run(pipeline, runner); + } + + DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { + updatePAssertCount(pipeline); + + TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class); + final DataflowPipelineJob job; + job = runner.run(pipeline); + + LOG.info("Running Dataflow job {} with {} expected assertions.", + job.getJobId(), expectedNumberOfAssertions); + + assertThat(job, testPipelineOptions.getOnCreateMatcher()); + + final ErrorMonitorMessagesHandler messageHandler = + new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler()); + + try { + Optional<Boolean> result = Optional.absent(); + + if (options.isStreaming()) { + // In streaming, there are infinite retries, so rather than timeout + // we try to terminate early by polling and canceling if we see + // an error message + while (true) { + State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler); + if (state != null && state.isTerminal()) { + break; + } + + if (messageHandler.hasSeenError()) { + if (!job.getState().isTerminal()) { + LOG.info("Cancelling Dataflow job {}", job.getJobId()); + job.cancel(); + } + break; + } + } + + // Whether we canceled or not, this gets the final state of the job or times out + State finalState = + job.waitUntilFinish( + Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler); + + // Getting the final state timed out; it may not indicate a failure. + // This cancellation may be the second + if (finalState == null || finalState == State.RUNNING) { + LOG.info( + "Dataflow job {} took longer than {} seconds to complete, cancelling.", + job.getJobId(), + options.getTestTimeoutSeconds()); + job.cancel(); + } + + if (messageHandler.hasSeenError()) { + result = Optional.of(false); + } + } else { + job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler); + result = checkForPAssertSuccess(job); + } + + if (!result.isPresent()) { + if (options.isStreaming()) { + LOG.warn( + "Dataflow job {} did not output a success or failure metric." + + " In rare situations, some PAsserts may not have run." + + " This is a known limitation of Dataflow in streaming.", + job.getJobId()); + } else { + throw new IllegalStateException( + String.format( + "Dataflow job %s did not output a success or failure metric.", job.getJobId())); + } + } else if (!result.get()) { + throw new AssertionError( + Strings.isNullOrEmpty(messageHandler.getErrorMessage()) + ? String.format( + "Dataflow job %s terminated in state %s but did not return a failure reason.", + job.getJobId(), job.getState()) + : messageHandler.getErrorMessage()); + } else { + assertThat(job, testPipelineOptions.getOnSuccessMatcher()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + return job; + } + + @VisibleForTesting + void updatePAssertCount(Pipeline pipeline) { + if (DataflowRunner.hasExperiment(options, "beam_fn_api")) { + // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions. + expectedNumberOfAssertions = 0; + } else { + expectedNumberOfAssertions = PAssert.countAsserts(pipeline); + } + } + + /** + * Check that PAssert expectations were met. + * + * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the + * pipeline, then this method will state that all PAsserts succeeded. + * + * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed, + * Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the + * evidence is inconclusive. + */ + @VisibleForTesting + Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException { + + // If the job failed, this is a definite failure. We only cancel jobs when they fail. + State state = job.getState(); + if (state == State.FAILED || state == State.CANCELLED) { + LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state); + return Optional.of(false); + } + + JobMetrics metrics = getJobMetrics(job); + if (metrics == null || metrics.getMetrics() == null) { + LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); + return Optional.absent(); + } + + int successes = 0; + int failures = 0; + for (MetricUpdate metric : metrics.getMetrics()) { + if (metric.getName() == null + || metric.getName().getContext() == null + || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { + // Don't double count using the non-tentative version of the metric. + continue; + } + if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) { + successes += ((BigDecimal) metric.getScalar()).intValue(); + } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) { + failures += ((BigDecimal) metric.getScalar()).intValue(); + } + } + + if (failures > 0) { + LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of " + + "{} expected assertions.", job.getJobId(), successes, failures, + expectedNumberOfAssertions); + return Optional.of(false); + } else if (successes >= expectedNumberOfAssertions) { + LOG.info( + "Success result for Dataflow job {}." + + " Found {} success, {} failures out of {} expected assertions.", + job.getJobId(), + successes, + failures, + expectedNumberOfAssertions); + return Optional.of(true); + } + + LOG.info( + "Inconclusive results for Dataflow job {}." + + " Found {} success, {} failures out of {} expected assertions.", + job.getJobId(), + successes, + failures, + expectedNumberOfAssertions); + return Optional.absent(); + } + + @Nullable + @VisibleForTesting + JobMetrics getJobMetrics(DataflowPipelineJob job) { + JobMetrics metrics = null; + try { + metrics = dataflowClient.getJobMetrics(job.getJobId()); + } catch (IOException e) { + LOG.warn("Failed to get job metrics: ", e); + } + return metrics; + } + + @Override + public String toString() { + return "TestDataflowRunner#" + options.getAppName(); + } + + /** + * Monitors job log output messages for errors. + * + * <p>Creates an error message representing the concatenation of all error messages seen. + */ + private static class ErrorMonitorMessagesHandler implements JobMessagesHandler { + private final DataflowPipelineJob job; + private final JobMessagesHandler messageHandler; + private final StringBuffer errorMessage; + private volatile boolean hasSeenError; + + private ErrorMonitorMessagesHandler( + DataflowPipelineJob job, JobMessagesHandler messageHandler) { + this.job = job; + this.messageHandler = messageHandler; + this.errorMessage = new StringBuffer(); + this.hasSeenError = false; + } + + @Override + public void process(List<JobMessage> messages) { + messageHandler.process(messages); + for (JobMessage message : messages) { + if (message.getMessageImportance() != null + && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) { + LOG.info("Dataflow job {} threw exception. Failure message was: {}", + job.getJobId(), message.getMessageText()); + errorMessage.append(message.getMessageText()); + hasSeenError = true; + } + } + } + + boolean hasSeenError() { + return hasSeenError; + } + + String getErrorMessage() { + return errorMessage.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java deleted file mode 100644 index 12f7b39..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.testing; - -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; - -/** - * A set of options used to configure the {@link TestPipeline}. - */ -public interface TestDataflowPipelineOptions extends TestPipelineOptions, DataflowPipelineOptions { -} http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java deleted file mode 100644 index ce91915..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.testing; - -import static org.hamcrest.MatcherAssert.assertThat; - -import com.google.api.services.dataflow.model.JobMessage; -import com.google.api.services.dataflow.model.JobMetrics; -import com.google.api.services.dataflow.model.MetricUpdate; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Strings; -import java.io.IOException; -import java.math.BigDecimal; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.DataflowClient; -import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link TestDataflowRunner} is a pipeline runner that wraps a - * {@link DataflowRunner} when running tests against the {@link TestPipeline}. - * - * @see TestPipeline - */ -public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { - private static final String TENTATIVE_COUNTER = "tentative"; - private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); - - private final TestDataflowPipelineOptions options; - private final DataflowClient dataflowClient; - private final DataflowRunner runner; - private int expectedNumberOfAssertions = 0; - - TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) { - this.options = options; - this.dataflowClient = client; - this.runner = DataflowRunner.fromOptions(options); - } - - /** - * Constructs a runner from the provided options. - */ - public static TestDataflowRunner fromOptions(PipelineOptions options) { - TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class); - String tempLocation = Joiner.on("/").join( - dataflowOptions.getTempRoot(), - dataflowOptions.getJobName(), - "output", - "results"); - dataflowOptions.setTempLocation(tempLocation); - - return new TestDataflowRunner( - dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class))); - } - - @VisibleForTesting - static TestDataflowRunner fromOptionsAndClient( - TestDataflowPipelineOptions options, DataflowClient client) { - return new TestDataflowRunner(options, client); - } - - @Override - public DataflowPipelineJob run(Pipeline pipeline) { - return run(pipeline, runner); - } - - DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { - updatePAssertCount(pipeline); - - TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class); - final DataflowPipelineJob job; - job = runner.run(pipeline); - - LOG.info("Running Dataflow job {} with {} expected assertions.", - job.getJobId(), expectedNumberOfAssertions); - - assertThat(job, testPipelineOptions.getOnCreateMatcher()); - - final ErrorMonitorMessagesHandler messageHandler = - new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler()); - - try { - Optional<Boolean> result = Optional.absent(); - - if (options.isStreaming()) { - // In streaming, there are infinite retries, so rather than timeout - // we try to terminate early by polling and canceling if we see - // an error message - while (true) { - State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler); - if (state != null && state.isTerminal()) { - break; - } - - if (messageHandler.hasSeenError()) { - if (!job.getState().isTerminal()) { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - job.cancel(); - } - break; - } - } - - // Whether we canceled or not, this gets the final state of the job or times out - State finalState = - job.waitUntilFinish( - Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler); - - // Getting the final state timed out; it may not indicate a failure. - // This cancellation may be the second - if (finalState == null || finalState == State.RUNNING) { - LOG.info( - "Dataflow job {} took longer than {} seconds to complete, cancelling.", - job.getJobId(), - options.getTestTimeoutSeconds()); - job.cancel(); - } - - if (messageHandler.hasSeenError()) { - result = Optional.of(false); - } - } else { - job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler); - result = checkForPAssertSuccess(job); - } - - if (!result.isPresent()) { - if (options.isStreaming()) { - LOG.warn( - "Dataflow job {} did not output a success or failure metric." - + " In rare situations, some PAsserts may not have run." - + " This is a known limitation of Dataflow in streaming.", - job.getJobId()); - } else { - throw new IllegalStateException( - String.format( - "Dataflow job %s did not output a success or failure metric.", job.getJobId())); - } - } else if (!result.get()) { - throw new AssertionError( - Strings.isNullOrEmpty(messageHandler.getErrorMessage()) - ? String.format( - "Dataflow job %s terminated in state %s but did not return a failure reason.", - job.getJobId(), job.getState()) - : messageHandler.getErrorMessage()); - } else { - assertThat(job, testPipelineOptions.getOnSuccessMatcher()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - return job; - } - - @VisibleForTesting - void updatePAssertCount(Pipeline pipeline) { - if (DataflowRunner.hasExperiment(options, "beam_fn_api")) { - // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions. - expectedNumberOfAssertions = 0; - } else { - expectedNumberOfAssertions = PAssert.countAsserts(pipeline); - } - } - - /** - * Check that PAssert expectations were met. - * - * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the - * pipeline, then this method will state that all PAsserts succeeded. - * - * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed, - * Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the - * evidence is inconclusive. - */ - @VisibleForTesting - Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException { - - // If the job failed, this is a definite failure. We only cancel jobs when they fail. - State state = job.getState(); - if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state); - return Optional.of(false); - } - - JobMetrics metrics = getJobMetrics(job); - if (metrics == null || metrics.getMetrics() == null) { - LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); - return Optional.absent(); - } - - int successes = 0; - int failures = 0; - for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null - || metric.getName().getContext() == null - || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { - // Don't double count using the non-tentative version of the metric. - continue; - } - if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) { - successes += ((BigDecimal) metric.getScalar()).intValue(); - } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) { - failures += ((BigDecimal) metric.getScalar()).intValue(); - } - } - - if (failures > 0) { - LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, - expectedNumberOfAssertions); - return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { - LOG.info( - "Success result for Dataflow job {}." - + " Found {} success, {} failures out of {} expected assertions.", - job.getJobId(), - successes, - failures, - expectedNumberOfAssertions); - return Optional.of(true); - } - - LOG.info( - "Inconclusive results for Dataflow job {}." - + " Found {} success, {} failures out of {} expected assertions.", - job.getJobId(), - successes, - failures, - expectedNumberOfAssertions); - return Optional.absent(); - } - - @Nullable - @VisibleForTesting - JobMetrics getJobMetrics(DataflowPipelineJob job) { - JobMetrics metrics = null; - try { - metrics = dataflowClient.getJobMetrics(job.getJobId()); - } catch (IOException e) { - LOG.warn("Failed to get job metrics: ", e); - } - return metrics; - } - - @Override - public String toString() { - return "TestDataflowRunner#" + options.getAppName(); - } - - /** - * Monitors job log output messages for errors. - * - * <p>Creates an error message representing the concatenation of all error messages seen. - */ - private static class ErrorMonitorMessagesHandler implements JobMessagesHandler { - private final DataflowPipelineJob job; - private final JobMessagesHandler messageHandler; - private final StringBuffer errorMessage; - private volatile boolean hasSeenError; - - private ErrorMonitorMessagesHandler( - DataflowPipelineJob job, JobMessagesHandler messageHandler) { - this.job = job; - this.messageHandler = messageHandler; - this.errorMessage = new StringBuffer(); - this.hasSeenError = false; - } - - @Override - public void process(List<JobMessage> messages) { - messageHandler.process(messages); - for (JobMessage message : messages) { - if (message.getMessageImportance() != null - && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) { - LOG.info("Dataflow job {} threw exception. Failure message was: {}", - job.getJobId(), message.getMessageText()); - errorMessage.append(message.getMessageText()); - hasSeenError = true; - } - } - } - - boolean hasSeenError() { - return hasSeenError; - } - - String getErrorMessage() { - return errorMessage.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java deleted file mode 100644 index 9683df0..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Provides utilities for integration testing and {@link - * org.apache.beam.sdk.testing.ValidatesRunner} tests of the Google Cloud Dataflow - * runner. - */ -package org.apache.beam.runners.dataflow.testing; http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index aabdd84..7e88300 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.math.BigDecimal; -import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.metrics.MetricQueryResults; http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index f868a17..df894d2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -48,7 +48,6 @@ import java.util.List; import java.util.NavigableMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.runners.dataflow.util.TimeUtil; import org.apache.beam.sdk.Pipeline; http://git-wip-us.apache.org/repos/asf/beam/blob/d305d6d3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java new file mode 100644 index 0000000..bf15747 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricStructuredName; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; +import org.apache.beam.runners.dataflow.util.TimeUtil; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SerializableMatcher; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.NoopPathValidator; +import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** Tests for {@link TestDataflowRunner}. */ +@RunWith(JUnit4.class) +public class TestDataflowRunnerTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + @Mock private DataflowClient mockClient; + + private TestDataflowPipelineOptions options; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setAppName("TestAppName"); + options.setProject("test-project"); + options.setTempLocation("gs://test/temp/location"); + options.setTempRoot("gs://test"); + options.setGcpCredential(new TestCredential()); + options.setRunner(TestDataflowRunner.class); + options.setPathValidatorClass(NoopPathValidator.class); + } + + @Test + public void testToString() { + assertEquals("TestDataflowRunner#TestAppName", + TestDataflowRunner.fromOptions(options).toString()); + } + + @Test + public void testRunBatchJobThatSucceeds() throws Exception { + Pipeline p = Pipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + assertEquals(mockJob, runner.run(p, mockRunner)); + } + + @Test + public void testRunBatchJobThatFails() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.FAILED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */)); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testBatchPipelineFailsIfException() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenAnswer(new Answer<State>() { + @Override + public State answer(InvocationOnMock invocation) { + JobMessage message = new JobMessage(); + message.setMessageText("FooException"); + message.setTime(TimeUtil.toCloudTime(Instant.now())); + message.setMessageImportance("JOB_MESSAGE_ERROR"); + ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1]) + .process(Arrays.asList(message)); + return State.CANCELLED; + } + }); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + assertThat(expected.getMessage(), containsString("FooException")); + verify(mockJob, never()).cancel(); + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + /** + * A streaming job that terminates with no error messages is a success. + */ + @Test + public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + runner.run(p, mockRunner); + } + + @Test + public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of())); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + runner.run(p, mockRunner); + } + + /** + * Tests that a streaming job with a false {@link PAssert} fails. + * + * <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it + * is detected only by failure job messages. With fuller metric support, this can detect a PAssert + * failure via metrics and raise an {@link AssertionError} in just that case. + */ + @Test + public void testRunStreamingJobThatFails() throws Exception { + testStreamingPipelineFailsIfException(); + } + + private JobMetrics generateMockMetricResponse(boolean success, boolean tentative) + throws Exception { + List<MetricUpdate> metrics = generateMockMetrics(success, tentative); + return buildJobMetrics(metrics); + } + + private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) { + MetricStructuredName name = new MetricStructuredName(); + name.setName(success ? "PAssertSuccess" : "PAssertFailure"); + name.setContext( + tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of()); + + MetricUpdate metric = new MetricUpdate(); + metric.setName(name); + metric.setScalar(BigDecimal.ONE); + return Lists.newArrayList(metric); + } + + private JobMetrics generateMockStreamingMetricResponse(Map<String, + BigDecimal> metricMap) throws IOException { + return buildJobMetrics(generateMockStreamingMetrics(metricMap)); + } + + private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) { + List<MetricUpdate> metrics = Lists.newArrayList(); + for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) { + MetricStructuredName name = new MetricStructuredName(); + name.setName(entry.getKey()); + + MetricUpdate metric = new MetricUpdate(); + metric.setName(name); + metric.setScalar(entry.getValue()); + metrics.add(metric); + } + return metrics; + } + + private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) { + JobMetrics jobMetrics = new JobMetrics(); + jobMetrics.setMetrics(metricList); + // N.B. Setting the factory is necessary in order to get valid JSON. + jobMetrics.setFactory(Transport.getJsonFactory()); + return jobMetrics; + } + + /** + * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has + * succeeded. + */ + @Test + public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */))); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + doReturn(State.DONE).when(job).getState(); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true))); + } + + /** + * Tests that when we just see a tentative failure for a {@link PAssert} it is considered a + * conclusive failure. + */ + @Test + public void testCheckingForSuccessWhenPAssertFails() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn( + buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */))); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + doReturn(State.DONE).when(job).getState(); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false))); + } + + @Test + public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception { + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn( + buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */))); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + runner.updatePAssertCount(p); + doReturn(State.RUNNING).when(job).getState(); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent())); + } + + /** + * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert + * success is a conclusive failure. + */ + @Test + public void testStreamingPipelineFailsIfServiceFails() throws Exception { + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + doReturn(State.FAILED).when(job).getState(); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false))); + } + + /** + * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run + * throws an {@link AssertionError}. + * + * <p>This is a known limitation/bug of the runner that it does not distinguish the two modes of + * failure. + */ + @Test + public void testStreamingPipelineFailsIfException() throws Exception { + options.setStreaming(true); + Pipeline pipeline = TestPipeline.create(options); + PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenAnswer(new Answer<State>() { + @Override + public State answer(InvocationOnMock invocation) { + JobMessage message = new JobMessage(); + message.setMessageText("FooException"); + message.setTime(TimeUtil.toCloudTime(Instant.now())); + message.setMessageImportance("JOB_MESSAGE_ERROR"); + ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1]) + .process(Arrays.asList(message)); + return State.CANCELLED; + } + }); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + + try { + runner.run(pipeline, mockRunner); + } catch (AssertionError exc) { + return; + } + fail("AssertionError expected"); + } + + @Test + public void testGetJobMetricsThatSucceeds() throws Exception { + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + JobMetrics metrics = runner.getJobMetrics(job); + + assertEquals(1, metrics.getMetrics().size()); + assertEquals(generateMockMetrics(true /* success */, true /* tentative */), + metrics.getMetrics()); + } + + @Test + public void testGetJobMetricsThatFailsForException() throws Exception { + DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException()); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + assertNull(runner.getJobMetrics(job)); + } + + @Test + public void testBatchOnCreateMatcher() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + runner.run(p, mockRunner); + } + + @Test + public void testStreamingOnCreateMatcher() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); + + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */ + )); + runner.run(p, mockRunner); + } + + @Test + public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + runner.run(p, mockRunner); + } + + /** + * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that + * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is + * invoked. + */ + @Test + public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); + + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + runner.run(p, mockRunner); + } + + @Test + public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.FAILED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher()); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + verify(mockJob, Mockito.times(1)).waitUntilFinish( + any(Duration.class), any(JobMessagesHandler.class)); + return; + } + fail("Expected an exception on pipeline failure."); + } + + /** + * Tests that when a streaming pipeline terminates in FAIL that the {@link + * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not + * invoked. + */ + @Test + public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.FAILED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher()); + + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.FAILED); + + runner.run(p, mockRunner); + // If the onSuccessMatcher were invoked, it would have crashed here. + } + + static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements + SerializableMatcher<PipelineResult> { + private final DataflowPipelineJob mockJob; + private final int called; + + public TestSuccessMatcher(DataflowPipelineJob job, int times) { + this.mockJob = job; + this.called = times; + } + + @Override + public boolean matches(Object o) { + if (!(o instanceof PipelineResult)) { + fail(String.format("Expected PipelineResult but received %s", o)); + } + try { + verify(mockJob, Mockito.times(called)).waitUntilFinish( + any(Duration.class), any(JobMessagesHandler.class)); + } catch (IOException | InterruptedException e) { + throw new AssertionError(e); + } + assertSame(mockJob, o); + return true; + } + + @Override + public void describeTo(Description description) { + } + } + + static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements + SerializableMatcher<PipelineResult> { + @Override + public boolean matches(Object o) { + fail("OnSuccessMatcher should not be called on pipeline failure."); + return false; + } + + @Override + public void describeTo(Description description) { + } + } +}
