Move TestDataflowRunner into dataflow package

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b46e7b9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b46e7b9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b46e7b9b

Branch: refs/heads/master
Commit: b46e7b9bb113a5b1b75c7aca9de8cbabdd75ff6f
Parents: eb043d0
Author: Kenneth Knowles <[email protected]>
Authored: Sat May 6 05:22:03 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Sat May 6 13:49:52 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineRegistrar.java     |   1 -
 .../dataflow/TestDataflowPipelineOptions.java   |  28 +
 .../runners/dataflow/TestDataflowRunner.java    | 322 +++++++++
 .../testing/TestDataflowPipelineOptions.java    |  28 -
 .../dataflow/testing/TestDataflowRunner.java    | 325 ---------
 .../runners/dataflow/testing/package-info.java  |  24 -
 .../runners/dataflow/DataflowMetricsTest.java   |   1 -
 .../dataflow/DataflowPipelineJobTest.java       |   1 -
 .../dataflow/TestDataflowRunnerTest.java        | 652 ++++++++++++++++++
 .../testing/TestDataflowRunnerTest.java         | 655 -------------------
 10 files changed, 1002 insertions(+), 1035 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
index f36930f..15855f9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.testing.TestDataflowRunner;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
new file mode 100644
index 0000000..a8acc76
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowPipelineOptions.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * A set of options used to configure the {@link TestPipeline}.
+ */
+public interface TestDataflowPipelineOptions extends TestPipelineOptions, 
DataflowPipelineOptions {
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
new file mode 100644
index 0000000..b81b487
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link TestDataflowRunner} is a pipeline runner that wraps a
+ * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
+ *
+ * @see TestPipeline
+ */
+public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+  private static final String TENTATIVE_COUNTER = "tentative";
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestDataflowRunner.class);
+
+  private final TestDataflowPipelineOptions options;
+  private final DataflowClient dataflowClient;
+  private final DataflowRunner runner;
+  private int expectedNumberOfAssertions = 0;
+
+  TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient 
client) {
+    this.options = options;
+    this.dataflowClient = client;
+    this.runner = DataflowRunner.fromOptions(options);
+  }
+
+  /**
+   * Constructs a runner from the provided options.
+   */
+  public static TestDataflowRunner fromOptions(PipelineOptions options) {
+    TestDataflowPipelineOptions dataflowOptions = 
options.as(TestDataflowPipelineOptions.class);
+    String tempLocation = Joiner.on("/").join(
+        dataflowOptions.getTempRoot(),
+        dataflowOptions.getJobName(),
+        "output",
+        "results");
+    dataflowOptions.setTempLocation(tempLocation);
+
+    return new TestDataflowRunner(
+        dataflowOptions, 
DataflowClient.create(options.as(DataflowPipelineOptions.class)));
+  }
+
+  @VisibleForTesting
+  static TestDataflowRunner fromOptionsAndClient(
+      TestDataflowPipelineOptions options, DataflowClient client) {
+    return new TestDataflowRunner(options, client);
+  }
+
+  @Override
+  public DataflowPipelineJob run(Pipeline pipeline) {
+    return run(pipeline, runner);
+  }
+
+  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
+    updatePAssertCount(pipeline);
+
+    TestPipelineOptions testPipelineOptions = 
options.as(TestPipelineOptions.class);
+    final DataflowPipelineJob job;
+    job = runner.run(pipeline);
+
+    LOG.info("Running Dataflow job {} with {} expected assertions.",
+        job.getJobId(), expectedNumberOfAssertions);
+
+    assertThat(job, testPipelineOptions.getOnCreateMatcher());
+
+    final ErrorMonitorMessagesHandler messageHandler =
+        new ErrorMonitorMessagesHandler(job, new 
MonitoringUtil.LoggingHandler());
+
+    try {
+      Optional<Boolean> result = Optional.absent();
+
+      if (options.isStreaming()) {
+        // In streaming, there are infinite retries, so rather than timeout
+        // we try to terminate early by polling and canceling if we see
+        // an error message
+        while (true) {
+          State state = job.waitUntilFinish(Duration.standardSeconds(3), 
messageHandler);
+          if (state != null && state.isTerminal()) {
+            break;
+          }
+
+          if (messageHandler.hasSeenError()) {
+            if (!job.getState().isTerminal()) {
+              LOG.info("Cancelling Dataflow job {}", job.getJobId());
+              job.cancel();
+            }
+            break;
+          }
+        }
+
+        // Whether we canceled or not, this gets the final state of the job or 
times out
+        State finalState =
+            job.waitUntilFinish(
+                Duration.standardSeconds(options.getTestTimeoutSeconds()), 
messageHandler);
+
+        // Getting the final state timed out; it may not indicate a failure.
+        // This cancellation may be the second
+        if (finalState == null || finalState == State.RUNNING) {
+          LOG.info(
+              "Dataflow job {} took longer than {} seconds to complete, 
cancelling.",
+              job.getJobId(),
+              options.getTestTimeoutSeconds());
+          job.cancel();
+        }
+
+        if (messageHandler.hasSeenError()) {
+          result = Optional.of(false);
+        }
+      } else {
+        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
+        result = checkForPAssertSuccess(job);
+      }
+
+      if (!result.isPresent()) {
+        if (options.isStreaming()) {
+          LOG.warn(
+              "Dataflow job {} did not output a success or failure metric."
+                  + " In rare situations, some PAsserts may not have run."
+                  + " This is a known limitation of Dataflow in streaming.",
+              job.getJobId());
+        } else {
+          throw new IllegalStateException(
+              String.format(
+                  "Dataflow job %s did not output a success or failure 
metric.", job.getJobId()));
+        }
+      } else if (!result.get()) {
+        throw new AssertionError(
+            Strings.isNullOrEmpty(messageHandler.getErrorMessage())
+                ? String.format(
+                    "Dataflow job %s terminated in state %s but did not return 
a failure reason.",
+                    job.getJobId(), job.getState())
+                : messageHandler.getErrorMessage());
+      } else {
+        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return job;
+  }
+
+  @VisibleForTesting
+  void updatePAssertCount(Pipeline pipeline) {
+    if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
+      // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 
assertions.
+      expectedNumberOfAssertions = 0;
+    } else {
+      expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
+    }
+  }
+
+  /**
+   * Check that PAssert expectations were met.
+   *
+   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts 
were used within the
+   * pipeline, then this method will state that all PAsserts succeeded.
+   *
+   * @return Optional.of(false) if we are certain a PAssert or some other 
critical thing has failed,
+   *     Optional.of(true) if we are certain all PAsserts passed, and 
Optional.absent() if the
+   *     evidence is inconclusive.
+   */
+  @VisibleForTesting
+  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws 
IOException {
+
+    // If the job failed, this is a definite failure. We only cancel jobs when 
they fail.
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info("Dataflow job {} terminated in failure state {}", 
job.getJobId(), state);
+      return Optional.of(false);
+    }
+
+    JobMetrics metrics = getJobMetrics(job);
+    if (metrics == null || metrics.getMetrics() == null) {
+      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
+      return Optional.absent();
+    }
+
+    int successes = 0;
+    int failures = 0;
+    for (MetricUpdate metric : metrics.getMetrics()) {
+      if (metric.getName() == null
+          || metric.getName().getContext() == null
+          || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
+        // Don't double count using the non-tentative version of the metric.
+        continue;
+      }
+      if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
+        successes += ((BigDecimal) metric.getScalar()).intValue();
+      } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
+        failures += ((BigDecimal) metric.getScalar()).intValue();
+      }
+    }
+
+    if (failures > 0) {
+      LOG.info("Failure result for Dataflow job {}. Found {} success, {} 
failures out of "
+          + "{} expected assertions.", job.getJobId(), successes, failures,
+          expectedNumberOfAssertions);
+      return Optional.of(false);
+    } else if (successes >= expectedNumberOfAssertions) {
+      LOG.info(
+          "Success result for Dataflow job {}."
+              + " Found {} success, {} failures out of {} expected 
assertions.",
+          job.getJobId(),
+          successes,
+          failures,
+          expectedNumberOfAssertions);
+      return Optional.of(true);
+    }
+
+    LOG.info(
+        "Inconclusive results for Dataflow job {}."
+            + " Found {} success, {} failures out of {} expected assertions.",
+        job.getJobId(),
+        successes,
+        failures,
+        expectedNumberOfAssertions);
+    return Optional.absent();
+  }
+
+  @Nullable
+  @VisibleForTesting
+  JobMetrics getJobMetrics(DataflowPipelineJob job) {
+    JobMetrics metrics = null;
+    try {
+      metrics = dataflowClient.getJobMetrics(job.getJobId());
+    } catch (IOException e) {
+      LOG.warn("Failed to get job metrics: ", e);
+    }
+    return metrics;
+  }
+
+  @Override
+  public String toString() {
+    return "TestDataflowRunner#" + options.getAppName();
+  }
+
+  /**
+   * Monitors job log output messages for errors.
+   *
+   * <p>Creates an error message representing the concatenation of all error 
messages seen.
+   */
+  private static class ErrorMonitorMessagesHandler implements 
JobMessagesHandler {
+    private final DataflowPipelineJob job;
+    private final JobMessagesHandler messageHandler;
+    private final StringBuffer errorMessage;
+    private volatile boolean hasSeenError;
+
+    private ErrorMonitorMessagesHandler(
+        DataflowPipelineJob job, JobMessagesHandler messageHandler) {
+      this.job = job;
+      this.messageHandler = messageHandler;
+      this.errorMessage = new StringBuffer();
+      this.hasSeenError = false;
+    }
+
+    @Override
+    public void process(List<JobMessage> messages) {
+      messageHandler.process(messages);
+      for (JobMessage message : messages) {
+        if (message.getMessageImportance() != null
+            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
+          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(), message.getMessageText());
+          errorMessage.append(message.getMessageText());
+          hasSeenError = true;
+        }
+      }
+    }
+
+    boolean hasSeenError() {
+      return hasSeenError;
+    }
+
+    String getErrorMessage() {
+      return errorMessage.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
deleted file mode 100644
index 12f7b39..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineOptions.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-
-/**
- * A set of options used to configure the {@link TestPipeline}.
- */
-public interface TestDataflowPipelineOptions extends TestPipelineOptions, 
DataflowPipelineOptions {
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
deleted file mode 100644
index ce91915..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowClient;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link TestDataflowRunner} is a pipeline runner that wraps a
- * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
- *
- * @see TestPipeline
- */
-public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
-  private static final String TENTATIVE_COUNTER = "tentative";
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestDataflowRunner.class);
-
-  private final TestDataflowPipelineOptions options;
-  private final DataflowClient dataflowClient;
-  private final DataflowRunner runner;
-  private int expectedNumberOfAssertions = 0;
-
-  TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient 
client) {
-    this.options = options;
-    this.dataflowClient = client;
-    this.runner = DataflowRunner.fromOptions(options);
-  }
-
-  /**
-   * Constructs a runner from the provided options.
-   */
-  public static TestDataflowRunner fromOptions(PipelineOptions options) {
-    TestDataflowPipelineOptions dataflowOptions = 
options.as(TestDataflowPipelineOptions.class);
-    String tempLocation = Joiner.on("/").join(
-        dataflowOptions.getTempRoot(),
-        dataflowOptions.getJobName(),
-        "output",
-        "results");
-    dataflowOptions.setTempLocation(tempLocation);
-
-    return new TestDataflowRunner(
-        dataflowOptions, 
DataflowClient.create(options.as(DataflowPipelineOptions.class)));
-  }
-
-  @VisibleForTesting
-  static TestDataflowRunner fromOptionsAndClient(
-      TestDataflowPipelineOptions options, DataflowClient client) {
-    return new TestDataflowRunner(options, client);
-  }
-
-  @Override
-  public DataflowPipelineJob run(Pipeline pipeline) {
-    return run(pipeline, runner);
-  }
-
-  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
-    updatePAssertCount(pipeline);
-
-    TestPipelineOptions testPipelineOptions = 
options.as(TestPipelineOptions.class);
-    final DataflowPipelineJob job;
-    job = runner.run(pipeline);
-
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
-
-    assertThat(job, testPipelineOptions.getOnCreateMatcher());
-
-    final ErrorMonitorMessagesHandler messageHandler =
-        new ErrorMonitorMessagesHandler(job, new 
MonitoringUtil.LoggingHandler());
-
-    try {
-      Optional<Boolean> result = Optional.absent();
-
-      if (options.isStreaming()) {
-        // In streaming, there are infinite retries, so rather than timeout
-        // we try to terminate early by polling and canceling if we see
-        // an error message
-        while (true) {
-          State state = job.waitUntilFinish(Duration.standardSeconds(3), 
messageHandler);
-          if (state != null && state.isTerminal()) {
-            break;
-          }
-
-          if (messageHandler.hasSeenError()) {
-            if (!job.getState().isTerminal()) {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-            break;
-          }
-        }
-
-        // Whether we canceled or not, this gets the final state of the job or 
times out
-        State finalState =
-            job.waitUntilFinish(
-                Duration.standardSeconds(options.getTestTimeoutSeconds()), 
messageHandler);
-
-        // Getting the final state timed out; it may not indicate a failure.
-        // This cancellation may be the second
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info(
-              "Dataflow job {} took longer than {} seconds to complete, 
cancelling.",
-              job.getJobId(),
-              options.getTestTimeoutSeconds());
-          job.cancel();
-        }
-
-        if (messageHandler.hasSeenError()) {
-          result = Optional.of(false);
-        }
-      } else {
-        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
-        result = checkForPAssertSuccess(job);
-      }
-
-      if (!result.isPresent()) {
-        if (options.isStreaming()) {
-          LOG.warn(
-              "Dataflow job {} did not output a success or failure metric."
-                  + " In rare situations, some PAsserts may not have run."
-                  + " This is a known limitation of Dataflow in streaming.",
-              job.getJobId());
-        } else {
-          throw new IllegalStateException(
-              String.format(
-                  "Dataflow job %s did not output a success or failure 
metric.", job.getJobId()));
-        }
-      } else if (!result.get()) {
-        throw new AssertionError(
-            Strings.isNullOrEmpty(messageHandler.getErrorMessage())
-                ? String.format(
-                    "Dataflow job %s terminated in state %s but did not return 
a failure reason.",
-                    job.getJobId(), job.getState())
-                : messageHandler.getErrorMessage());
-      } else {
-        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return job;
-  }
-
-  @VisibleForTesting
-  void updatePAssertCount(Pipeline pipeline) {
-    if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
-      // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 
assertions.
-      expectedNumberOfAssertions = 0;
-    } else {
-      expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
-    }
-  }
-
-  /**
-   * Check that PAssert expectations were met.
-   *
-   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts 
were used within the
-   * pipeline, then this method will state that all PAsserts succeeded.
-   *
-   * @return Optional.of(false) if we are certain a PAssert or some other 
critical thing has failed,
-   *     Optional.of(true) if we are certain all PAsserts passed, and 
Optional.absent() if the
-   *     evidence is inconclusive.
-   */
-  @VisibleForTesting
-  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws 
IOException {
-
-    // If the job failed, this is a definite failure. We only cancel jobs when 
they fail.
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("Dataflow job {} terminated in failure state {}", 
job.getJobId(), state);
-      return Optional.of(false);
-    }
-
-    JobMetrics metrics = getJobMetrics(job);
-    if (metrics == null || metrics.getMetrics() == null) {
-      LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
-      return Optional.absent();
-    }
-
-    int successes = 0;
-    int failures = 0;
-    for (MetricUpdate metric : metrics.getMetrics()) {
-      if (metric.getName() == null
-          || metric.getName().getContext() == null
-          || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
-        // Don't double count using the non-tentative version of the metric.
-        continue;
-      }
-      if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
-        successes += ((BigDecimal) metric.getScalar()).intValue();
-      } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
-        failures += ((BigDecimal) metric.getScalar()).intValue();
-      }
-    }
-
-    if (failures > 0) {
-      LOG.info("Failure result for Dataflow job {}. Found {} success, {} 
failures out of "
-          + "{} expected assertions.", job.getJobId(), successes, failures,
-          expectedNumberOfAssertions);
-      return Optional.of(false);
-    } else if (successes >= expectedNumberOfAssertions) {
-      LOG.info(
-          "Success result for Dataflow job {}."
-              + " Found {} success, {} failures out of {} expected 
assertions.",
-          job.getJobId(),
-          successes,
-          failures,
-          expectedNumberOfAssertions);
-      return Optional.of(true);
-    }
-
-    LOG.info(
-        "Inconclusive results for Dataflow job {}."
-            + " Found {} success, {} failures out of {} expected assertions.",
-        job.getJobId(),
-        successes,
-        failures,
-        expectedNumberOfAssertions);
-    return Optional.absent();
-  }
-
-  @Nullable
-  @VisibleForTesting
-  JobMetrics getJobMetrics(DataflowPipelineJob job) {
-    JobMetrics metrics = null;
-    try {
-      metrics = dataflowClient.getJobMetrics(job.getJobId());
-    } catch (IOException e) {
-      LOG.warn("Failed to get job metrics: ", e);
-    }
-    return metrics;
-  }
-
-  @Override
-  public String toString() {
-    return "TestDataflowRunner#" + options.getAppName();
-  }
-
-  /**
-   * Monitors job log output messages for errors.
-   *
-   * <p>Creates an error message representing the concatenation of all error 
messages seen.
-   */
-  private static class ErrorMonitorMessagesHandler implements 
JobMessagesHandler {
-    private final DataflowPipelineJob job;
-    private final JobMessagesHandler messageHandler;
-    private final StringBuffer errorMessage;
-    private volatile boolean hasSeenError;
-
-    private ErrorMonitorMessagesHandler(
-        DataflowPipelineJob job, JobMessagesHandler messageHandler) {
-      this.job = job;
-      this.messageHandler = messageHandler;
-      this.errorMessage = new StringBuffer();
-      this.hasSeenError = false;
-    }
-
-    @Override
-    public void process(List<JobMessage> messages) {
-      messageHandler.process(messages);
-      for (JobMessage message : messages) {
-        if (message.getMessageImportance() != null
-            && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
-              job.getJobId(), message.getMessageText());
-          errorMessage.append(message.getMessageText());
-          hasSeenError = true;
-        }
-      }
-    }
-
-    boolean hasSeenError() {
-      return hasSeenError;
-    }
-
-    String getErrorMessage() {
-      return errorMessage.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
deleted file mode 100644
index 9683df0..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Provides utilities for integration testing and {@link
- * org.apache.beam.sdk.testing.ValidatesRunner} tests of the Google Cloud 
Dataflow
- * runner.
- */
-package org.apache.beam.runners.dataflow.testing;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index aabdd84..7e88300 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.math.BigDecimal;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.metrics.MetricQueryResults;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index f868a17..df894d2 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -48,7 +48,6 @@ import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
 import org.apache.beam.sdk.Pipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
new file mode 100644
index 0000000..bf15747
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/** Tests for {@link TestDataflowRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowRunnerTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Mock private DataflowClient mockClient;
+
+  private TestDataflowPipelineOptions options;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+
+    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setAppName("TestAppName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setTempRoot("gs://test");
+    options.setGcpCredential(new TestCredential());
+    options.setRunner(TestDataflowRunner.class);
+    options.setPathValidatorClass(NoopPathValidator.class);
+  }
+
+  @Test
+  public void testToString() {
+    assertEquals("TestDataflowRunner#TestAppName",
+        TestDataflowRunner.fromOptions(options).toString());
+  }
+
+  @Test
+  public void testRunBatchJobThatSucceeds() throws Exception {
+    Pipeline p = Pipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    assertEquals(mockJob, runner.run(p, mockRunner));
+  }
+
+  @Test
+  public void testRunBatchJobThatFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, false /* 
tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out 
here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testBatchPipelineFailsIfException() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, never()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out 
here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  /**
+   * A streaming job that terminates with no error messages is a success.
+   */
+  @Test
+  public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws 
Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, 
BigDecimal>of()));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.run(p, mockRunner);
+  }
+
+  /**
+   * Tests that a streaming job with a false {@link PAssert} fails.
+   *
+   * <p>Currently, this failure is indistinguishable from a non-{@link 
PAssert} failure, because it
+   * is detected only by failure job messages. With fuller metric support, 
this can detect a PAssert
+   * failure via metrics and raise an {@link AssertionError} in just that case.
+   */
+  @Test
+  public void testRunStreamingJobThatFails() throws Exception {
+    testStreamingPipelineFailsIfException();
+  }
+
+  private JobMetrics generateMockMetricResponse(boolean success, boolean 
tentative)
+      throws Exception {
+    List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
+    return buildJobMetrics(metrics);
+  }
+
+  private List<MetricUpdate> generateMockMetrics(boolean success, boolean 
tentative) {
+    MetricStructuredName name = new MetricStructuredName();
+    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+    name.setContext(
+        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, 
String>of());
+
+    MetricUpdate metric = new MetricUpdate();
+    metric.setName(name);
+    metric.setScalar(BigDecimal.ONE);
+    return Lists.newArrayList(metric);
+  }
+
+  private JobMetrics generateMockStreamingMetricResponse(Map<String,
+      BigDecimal> metricMap) throws IOException {
+    return buildJobMetrics(generateMockStreamingMetrics(metricMap));
+  }
+
+  private List<MetricUpdate> generateMockStreamingMetrics(Map<String, 
BigDecimal> metricMap) {
+    List<MetricUpdate> metrics = Lists.newArrayList();
+    for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
+      MetricStructuredName name = new MetricStructuredName();
+      name.setName(entry.getKey());
+
+      MetricUpdate metric = new MetricUpdate();
+      metric.setName(name);
+      metric.setScalar(entry.getValue());
+      metrics.add(metric);
+    }
+    return metrics;
+  }
+
+  private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(metricList);
+    // N.B. Setting the factory is necessary in order to get valid JSON.
+    jobMetrics.setFactory(Transport.getJsonFactory());
+    return jobMetrics;
+  }
+
+  /**
+   * Tests that a tentative {@code true} from metrics indicates that every 
{@link PAssert} has
+   * succeeded.
+   */
+  @Test
+  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, 
true /* tentative */)));
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    doReturn(State.DONE).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
+  }
+
+  /**
+   * Tests that when we just see a tentative failure for a {@link PAssert} it 
is considered a
+   * conclusive failure.
+   */
+  @Test
+  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(
+            buildJobMetrics(generateMockMetrics(false /* success */, true /* 
tentative */)));
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    doReturn(State.DONE).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), 
equalTo(Optional.of(false)));
+  }
+
+  @Test
+  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws 
Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, 
"test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(
+            buildJobMetrics(generateMockMetrics(true /* success */, false /* 
tentative */)));
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.updatePAssertCount(p);
+    doReturn(State.RUNNING).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), 
equalTo(Optional.<Boolean>absent()));
+  }
+
+  /**
+   * Tests that if a streaming pipeline terminates with FAIL that the check 
for PAssert
+   * success is a conclusive failure.
+   */
+  @Test
+  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, 
"test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    doReturn(State.FAILED).when(job).getState();
+    assertThat(runner.checkForPAssertSuccess(job), 
equalTo(Optional.of(false)));
+  }
+
+  /**
+   * Tests that if a streaming pipeline crash loops for a non-assertion reason 
that the test run
+   * throws an {@link AssertionError}.
+   *
+   * <p>This is a known limitation/bug of the runner that it does not 
distinguish the two modes of
+   * failure.
+   */
+  @Test
+  public void testStreamingPipelineFailsIfException() throws Exception {
+    options.setStreaming(true);
+    Pipeline pipeline = TestPipeline.create(options);
+    PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+
+    try {
+      runner.run(pipeline, mockRunner);
+    } catch (AssertionError exc) {
+      return;
+    }
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testGetJobMetricsThatSucceeds() throws Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, 
"test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    JobMetrics metrics = runner.getJobMetrics(job);
+
+    assertEquals(1, metrics.getMetrics().size());
+    assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
+        metrics.getMetrics());
+  }
+
+  @Test
+  public void testGetJobMetricsThatFailsForException() throws Exception {
+    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, 
"test-job", options, null));
+    Pipeline p = TestPipeline.create(options);
+    p.apply(Create.of(1, 2, 3));
+
+    when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    assertNull(runner.getJobMetrics(job));
+  }
+
+  @Test
+  public void testBatchOnCreateMatcher() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnCreateMatcher(new 
TestSuccessMatcher(mockJob, 0));
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testStreamingOnCreateMatcher() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnCreateMatcher(new 
TestSuccessMatcher(mockJob, 0));
+
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */
+        ));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception 
{
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new 
TestSuccessMatcher(mockJob, 1));
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  /**
+   * Tests that when a streaming pipeline terminates and doesn't fail due to 
{@link PAssert} that
+   * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) 
on success matcher} is
+   * invoked.
+   */
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws 
Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new 
TestSuccessMatcher(mockJob, 1));
+
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new 
TestFailureMatcher());
+
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      verify(mockJob, Mockito.times(1)).waitUntilFinish(
+          any(Duration.class), any(JobMessagesHandler.class));
+      return;
+    }
+    fail("Expected an exception on pipeline failure.");
+  }
+
+  /**
+   * Tests that when a streaming pipeline terminates in FAIL that the {@link
+   * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success 
matcher} is not
+   * invoked.
+   */
+  @Test
+  public void testStreamingOnSuccessMatcherWhenPipelineFails() throws 
Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    final DataflowPipelineJob mockJob = 
Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new 
TestFailureMatcher());
+
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.FAILED);
+
+    runner.run(p, mockRunner);
+    // If the onSuccessMatcher were invoked, it would have crashed here.
+  }
+
+  static class TestSuccessMatcher extends BaseMatcher<PipelineResult> 
implements
+      SerializableMatcher<PipelineResult> {
+    private final DataflowPipelineJob mockJob;
+    private final int called;
+
+    public TestSuccessMatcher(DataflowPipelineJob job, int times) {
+      this.mockJob = job;
+      this.called = times;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      if (!(o instanceof PipelineResult)) {
+        fail(String.format("Expected PipelineResult but received %s", o));
+      }
+      try {
+        verify(mockJob, Mockito.times(called)).waitUntilFinish(
+            any(Duration.class), any(JobMessagesHandler.class));
+      } catch (IOException | InterruptedException e) {
+        throw new AssertionError(e);
+      }
+      assertSame(mockJob, o);
+      return true;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
+
+  static class TestFailureMatcher extends BaseMatcher<PipelineResult> 
implements
+      SerializableMatcher<PipelineResult> {
+    @Override
+    public boolean matches(Object o) {
+      fail("OnSuccessMatcher should not be called on pipeline failure.");
+      return false;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+    }
+  }
+}

Reply via email to