Repository: beam
Updated Branches:
  refs/heads/master 196f4ba0b -> 634bf4e3a


Do not rely on metrics in streaming TestDataflowRunner

The Dataflow service automatically shuts down jobs when their
input watermarks reach infinity, and other functionality can be
restored when the feature is restored to the Dataflow service.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed8bd626
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed8bd626
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed8bd626

Branch: refs/heads/master
Commit: ed8bd62652126d8d0cf054cee5cc79dda88e3415
Parents: 196f4ba
Author: Kenneth Knowles <[email protected]>
Authored: Wed Apr 26 17:55:29 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Apr 27 08:50:54 2017 -0700

----------------------------------------------------------------------
 .../dataflow/testing/TestDataflowRunner.java    | 181 ++++------
 .../testing/TestDataflowRunnerTest.java         | 358 +++++++------------
 2 files changed, 195 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ed8bd626/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index dc32466..ba9d971 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -26,13 +26,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
@@ -59,13 +55,6 @@ import org.slf4j.LoggerFactory;
  */
 public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   private static final String TENTATIVE_COUNTER = "tentative";
-  // See https://issues.apache.org/jira/browse/BEAM-1170
-  // we need to either fix the API or pipe the DRAINED signal through
-  @VisibleForTesting
-  static final String LEGACY_WATERMARK_METRIC_SUFFIX = 
"windmill-data-watermark";
-  @VisibleForTesting
-  static final String WATERMARK_METRIC_SUFFIX = "DataWatermark";
-  private static final long MAX_WATERMARK_VALUE = -2L;
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataflowRunner.class);
 
   private final TestDataflowPipelineOptions options;
@@ -73,9 +62,9 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   private final DataflowRunner runner;
   private int expectedNumberOfAssertions = 0;
 
-  TestDataflowRunner(TestDataflowPipelineOptions options) {
+  TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient 
client) {
     this.options = options;
-    this.dataflowClient = DataflowClient.create(options);
+    this.dataflowClient = client;
     this.runner = DataflowRunner.fromOptions(options);
   }
 
@@ -91,7 +80,14 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         "results");
     dataflowOptions.setTempLocation(tempLocation);
 
-    return new TestDataflowRunner(dataflowOptions);
+    return new TestDataflowRunner(
+        dataflowOptions, 
DataflowClient.create(options.as(DataflowPipelineOptions.class)));
+  }
+
+  @VisibleForTesting
+  static TestDataflowRunner fromOptionsAndClient(
+      TestDataflowPipelineOptions options, DataflowClient client) {
+    return new TestDataflowRunner(options, client);
   }
 
   @Override
@@ -115,40 +111,34 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         new ErrorMonitorMessagesHandler(job, new 
MonitoringUtil.LoggingHandler());
 
     try {
-      final Optional<Boolean> success;
+      Optional<Boolean> result = Optional.absent();
 
       if (options.isStreaming()) {
-        Future<Optional<Boolean>> resultFuture = 
options.getExecutorService().submit(
-            new Callable<Optional<Boolean>>() {
-          @Override
-          public Optional<Boolean> call() throws Exception {
-            try {
-              for (;;) {
-                JobMetrics metrics = getJobMetrics(job);
-                Optional<Boolean> success = checkForPAssertSuccess(job, 
metrics);
-                if (messageHandler.hasSeenError()) {
-                  return Optional.of(false);
-                }
-
-                if (success.isPresent() && (!success.get() || 
atMaxWatermark(job, metrics))) {
-                  // It's possible that the streaming pipeline doesn't use 
PAssert.
-                  // So checkForSuccess() will return true before job is 
finished.
-                  // atMaxWatermark() will handle this case.
-                  return success;
-                }
-                Thread.sleep(10000L);
-              }
-            } finally {
-              if (!job.getState().isTerminal()) {
-                LOG.info("Cancelling Dataflow job {}", job.getJobId());
-                job.cancel();
-              }
+        // In streaming, there are infinite retries, so rather than timeout
+        // we try to terminate early by polling and canceling if we see
+        // an error message
+        while (true) {
+          State state = job.waitUntilFinish(Duration.standardSeconds(3), 
messageHandler);
+          if (state != null && state.isTerminal()) {
+            break;
+          }
+
+          if (messageHandler.hasSeenError()) {
+            if (!job.getState().isTerminal()) {
+              LOG.info("Cancelling Dataflow job {}", job.getJobId());
+              job.cancel();
             }
+            break;
           }
-        });
+        }
+
+        // Whether we canceled or not, this gets the final state of the job or 
times out
         State finalState =
             job.waitUntilFinish(
                 Duration.standardSeconds(options.getTestTimeoutSeconds()), 
messageHandler);
+
+        // Getting the final state timed out; it may not indicate a failure.
+        // This cancellation may be the second
         if (finalState == null || finalState == State.RUNNING) {
           LOG.info(
               "Dataflow job {} took longer than {} seconds to complete, 
cancelling.",
@@ -156,15 +146,28 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
               options.getTestTimeoutSeconds());
           job.cancel();
         }
-        success = resultFuture.get();
+
+        if (messageHandler.hasSeenError()) {
+          result = Optional.of(false);
+        }
       } else {
         job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
-        success = checkForPAssertSuccess(job, getJobMetrics(job));
+        result = checkForPAssertSuccess(job);
       }
-      if (!success.isPresent()) {
-        throw new IllegalStateException(
-            "The dataflow did not output a success or failure metric.");
-      } else if (!success.get()) {
+
+      if (!result.isPresent()) {
+        if (options.isStreaming()) {
+          LOG.warn(
+              "Dataflow job {} did not output a success or failure metric."
+                  + " In rare situations, some PAsserts may not have run."
+                  + " This is a known limitation of Dataflow in streaming.",
+              job.getJobId());
+        } else {
+          throw new IllegalStateException(
+              String.format(
+                  "Dataflow job %s did not output a success or failure 
metric.", job.getJobId()));
+        }
+      } else if (!result.get()) {
         throw new AssertionError(
             Strings.isNullOrEmpty(messageHandler.getErrorMessage())
                 ? String.format(
@@ -177,9 +180,6 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
-    } catch (ExecutionException e) {
-      Throwables.propagateIfPossible(e.getCause());
-      throw new RuntimeException(e.getCause());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -200,22 +200,24 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   /**
    * Check that PAssert expectations were met.
    *
-   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts 
were used
-   * within the pipeline, then this method will state that all PAsserts 
succeeded.
+   * <p>If the pipeline is not in a failed/cancelled state and no PAsserts 
were used within the
+   * pipeline, then this method will state that all PAsserts succeeded.
    *
-   * @return Optional.of(false) if the job failed, was cancelled or if any 
PAssert
-   *         expectation was not met, true if all the PAssert expectations 
passed,
-   *         Optional.absent() if the metrics were inconclusive.
+   * @return Optional.of(false) if we are certain a PAssert or some other 
critical thing has failed,
+   *     Optional.of(true) if we are certain all PAsserts passed, and 
Optional.absent() if the
+   *     evidence is inconclusive.
    */
   @VisibleForTesting
-  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job, @Nullable 
JobMetrics metrics)
-      throws IOException {
+  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws 
IOException {
+
+    // If the job failed, this is a definite failure. We only cancel jobs when 
they fail.
     State state = job.getState();
     if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("The pipeline failed");
+      LOG.info("Dataflow job {} terminated in failure state {}", 
job.getJobId(), state);
       return Optional.of(false);
     }
 
+    JobMetrics metrics = getJobMetrics(job);
     if (metrics == null || metrics.getMetrics() == null) {
       LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
       return Optional.absent();
@@ -238,66 +240,31 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
 
     if (failures > 0) {
-      LOG.info("Found result while running Dataflow job {}. Found {} success, 
{} failures out of "
+      LOG.info("Failure result for Dataflow job {}. Found {} success, {} 
failures out of "
           + "{} expected assertions.", job.getJobId(), successes, failures,
           expectedNumberOfAssertions);
       return Optional.of(false);
     } else if (successes >= expectedNumberOfAssertions) {
-      LOG.info("Found result while running Dataflow job {}. Found {} success, 
{} failures out of "
-          + "{} expected assertions.", job.getJobId(), successes, failures,
+      LOG.info(
+          "Success result for Dataflow job {}."
+              + " Found {} success, {} failures out of {} expected 
assertions.",
+          job.getJobId(),
+          successes,
+          failures,
           expectedNumberOfAssertions);
       return Optional.of(true);
     }
 
-    LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} 
expected "
-        + "assertions.", job.getJobId(), successes, failures, 
expectedNumberOfAssertions);
+    LOG.info(
+        "Inconclusive results for Dataflow job {}."
+            + " Found {} success, {} failures out of {} expected assertions.",
+        job.getJobId(),
+        successes,
+        failures,
+        expectedNumberOfAssertions);
     return Optional.absent();
   }
 
-  /**
-   * Checks wether a metric is a streaming watermark.
-   *
-   * @return true if the metric is a watermark.
-   */
-  boolean isWatermark(MetricUpdate metric) {
-    if (metric.getName() == null || metric.getName().getName() == null) {
-      return false; // no name -> shouldn't happen, not the watermark
-    }
-    if (metric.getScalar() == null) {
-      return false; // no scalar value -> not the watermark
-    }
-    String name = metric.getName().getName();
-    return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX)
-        || name.endsWith(WATERMARK_METRIC_SUFFIX);
-  }
-
-  /**
-   * Check watermarks of the streaming job. At least one watermark metric must 
exist.
-   *
-   * @return true if all watermarks are at max, false otherwise.
-   */
-  @VisibleForTesting
-  boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) {
-    boolean hasMaxWatermark = false;
-    for (MetricUpdate metric : metrics.getMetrics()) {
-      if (!isWatermark(metric)) {
-        continue;
-      }
-      BigDecimal watermark = (BigDecimal) metric.getScalar();
-      hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE;
-      if (!hasMaxWatermark) {
-        LOG.info("Found a non-max watermark metric {} in job {}", 
metric.getName().getName(),
-            job.getJobId());
-        return false;
-      }
-    }
-
-    if (hasMaxWatermark) {
-      LOG.info("All watermarks are at max. JobID: {}", job.getJobId());
-    }
-    return hasMaxWatermark;
-  }
-
   @Nullable
   @VisibleForTesting
   JobMetrics getJobMetrics(DataflowPipelineJob job) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ed8bd626/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index e4fa788..307393c 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -17,32 +17,21 @@
  */
 package org.apache.beam.runners.dataflow.testing;
 
-import static 
org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX;
-import static 
org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.Json;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.testing.http.MockLowLevelHttpResponse;
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricStructuredName;
@@ -55,6 +44,7 @@ import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -69,7 +59,6 @@ import org.apache.beam.sdk.testing.SerializableMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.util.Transport;
@@ -94,21 +83,13 @@ import org.mockito.stubbing.Answer;
 @RunWith(JUnit4.class)
 public class TestDataflowRunnerTest {
   @Rule public ExpectedException expectedException = ExpectedException.none();
-  @Mock private MockHttpTransport transport;
-  @Mock private MockLowLevelHttpRequest request;
-  @Mock private GcsUtil mockGcsUtil;
-
-  private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2);
+  @Mock private DataflowClient mockClient;
 
   private TestDataflowPipelineOptions options;
-  private Dataflow service;
 
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
-    when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
-    doCallRealMethod().when(request).getContentAsString();
-    service = new Dataflow(transport, Transport.getJsonFactory(), null);
 
     options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
     options.setAppName("TestAppName");
@@ -116,7 +97,6 @@ public class TestDataflowRunnerTest {
     options.setTempLocation("gs://test/temp/location");
     options.setTempRoot("gs://test");
     options.setGcpCredential(new TestCredential());
-    options.setDataflowClient(service);
     options.setRunner(TestDataflowRunner.class);
     options.setPathValidatorClass(NoopPathValidator.class);
   }
@@ -124,12 +104,12 @@ public class TestDataflowRunnerTest {
   @Test
   public void testToString() {
     assertEquals("TestDataflowRunner#TestAppName",
-        new TestDataflowRunner(options).toString());
+        TestDataflowRunner.fromOptions(options).toString());
   }
 
   @Test
   public void testRunBatchJobThatSucceeds() throws Exception {
-    Pipeline p = TestPipeline.create(options);
+    Pipeline p = Pipeline.create(options);
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
@@ -141,9 +121,9 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
     assertEquals(mockJob, runner.run(p, mockRunner));
   }
 
@@ -161,9 +141,9 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
-        false /* tentative */, null /* additionalMetrics */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, false /* 
tentative */));
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -201,9 +181,9 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -216,6 +196,9 @@ public class TestDataflowRunnerTest {
     fail("AssertionError expected");
   }
 
+  /**
+   * A streaming job that terminates with no error messages is a success.
+   */
   @Test
   public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
     options.setStreaming(true);
@@ -224,17 +207,18 @@ public class TestDataflowRunnerTest {
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
     DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
     when(mockJob.getProjectId()).thenReturn("test-project");
     when(mockJob.getJobId()).thenReturn("test-job");
 
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute())
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */,
-            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     runner.run(p, mockRunner);
   }
 
@@ -245,60 +229,37 @@ public class TestDataflowRunnerTest {
     p.apply(Create.of(1, 2, 3));
 
     DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
+        .thenReturn(State.DONE);
     when(mockJob.getProjectId()).thenReturn("test-project");
     when(mockJob.getJobId()).thenReturn("test-job");
 
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute())
-        .thenReturn(generateMockStreamingMetricResponse(
-            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, 
BigDecimal>of()));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     runner.run(p, mockRunner);
   }
 
+  /**
+   * Tests that a streaming job with a false {@link PAssert} fails.
+   *
+   * <p>Currently, this failure is indistinguishable from a non-{@link 
PAssert} failure, because it
+   * is detected only by failure job messages. With fuller metric support, 
this can detect a PAssert
+   * failure via metrics and raise an {@link AssertionError} in just that case.
+   */
   @Test
   public void testRunStreamingJobThatFails() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out 
here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
+    testStreamingPipelineFailsIfException();
   }
 
-  private LowLevelHttpResponse generateMockMetricResponse(boolean success, 
boolean tentative,
-                                                          Map<String, 
BigDecimal> additionalMetrics)
+  private JobMetrics generateMockMetricResponse(boolean success, boolean 
tentative)
       throws Exception {
-    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
-    response.setContentType(Json.MEDIA_TYPE);
     List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
-    if (additionalMetrics != null && !additionalMetrics.isEmpty()) {
-      metrics.addAll(generateMockStreamingMetrics(additionalMetrics));
-    }
-    JobMetrics jobMetrics = buildJobMetrics(metrics);
-    response.setContent(jobMetrics.toPrettyString());
-    return response;
+    return buildJobMetrics(metrics);
   }
 
   private List<MetricUpdate> generateMockMetrics(boolean success, boolean 
tentative) {
@@ -313,13 +274,9 @@ public class TestDataflowRunnerTest {
     return Lists.newArrayList(metric);
   }
 
-  private LowLevelHttpResponse generateMockStreamingMetricResponse(Map<String,
+  private JobMetrics generateMockStreamingMetricResponse(Map<String,
       BigDecimal> metricMap) throws IOException {
-    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
-    response.setContentType(Json.MEDIA_TYPE);
-    JobMetrics jobMetrics = 
buildJobMetrics(generateMockStreamingMetrics(metricMap));
-    response.setContent(jobMetrics.toPrettyString());
-    return response;
+    return buildJobMetrics(generateMockStreamingMetrics(metricMap));
   }
 
   private List<MetricUpdate> generateMockStreamingMetrics(Map<String, 
BigDecimal> metricMap) {
@@ -344,6 +301,10 @@ public class TestDataflowRunnerTest {
     return jobMetrics;
   }
 
+  /**
+   * Tests that a tentative {@code true} from metrics indicates that every 
{@link PAssert} has
+   * succeeded.
+   */
   @Test
   public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
     DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
@@ -351,13 +312,18 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, 
true /* tentative */)));
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     doReturn(State.DONE).when(job).getState();
-    JobMetrics metrics = buildJobMetrics(
-        generateMockMetrics(true /* success */, true /* tentative */));
-    assertEquals(Optional.of(true), runner.checkForPAssertSuccess(job, 
metrics));
+    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
   }
 
+  /**
+   * Tests that when we just see a tentative failure for a {@link PAssert} it 
is considered a
+   * conclusive failure.
+   */
   @Test
   public void testCheckingForSuccessWhenPAssertFails() throws Exception {
     DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
@@ -365,11 +331,13 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(
+            buildJobMetrics(generateMockMetrics(false /* success */, true /* 
tentative */)));
+
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     doReturn(State.DONE).when(job).getState();
-    JobMetrics metrics = buildJobMetrics(
-        generateMockMetrics(false /* success */, true /* tentative */));
-    assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, 
metrics));
+    assertThat(runner.checkForPAssertSuccess(job), 
equalTo(Optional.of(false)));
   }
 
   @Test
@@ -379,108 +347,20 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    runner.updatePAssertCount(p);
-    doReturn(State.RUNNING).when(job).getState();
-    JobMetrics metrics = buildJobMetrics(
-        generateMockMetrics(true /* success */, false /* tentative */));
-    assertEquals(Optional.absent(), runner.checkForPAssertSuccess(job, 
metrics));
-  }
-
-  @Test
-  public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
-        ImmutableMap.of("no-watermark", new BigDecimal(100))));
-    doReturn(State.RUNNING).when(job).getState();
-    assertFalse(runner.atMaxWatermark(job, metrics));
-  }
-
-  @Test
-  public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws 
IOException {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
-        ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
-    doReturn(State.RUNNING).when(job).getState();
-    assertTrue(runner.atMaxWatermark(job, metrics));
-  }
-
-  @Test
-  public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws 
IOException {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
-        ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, 
DEFAULT_MAX_WATERMARK)));
-    doReturn(State.RUNNING).when(job).getState();
-    assertTrue(runner.atMaxWatermark(job, metrics));
-  }
-
-  @Test
-  public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws 
IOException {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(
+            buildJobMetrics(generateMockMetrics(true /* success */, false /* 
tentative */)));
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics
-        (ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
-    doReturn(State.RUNNING).when(job).getState();
-    assertFalse(runner.atMaxWatermark(job, metrics));
-  }
-
-  @Test
-  public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws 
IOException {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
-        ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
-            "two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
-    doReturn(State.RUNNING).when(job).getState();
-    assertTrue(runner.atMaxWatermark(job, metrics));
-  }
-
-  @Test
-  public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws 
IOException {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
-        ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
-            "two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
-    doReturn(State.RUNNING).when(job).getState();
-    assertFalse(runner.atMaxWatermark(job, metrics));
-  }
-
-  @Test
-  public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws 
IOException {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
-    JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics(
-        ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK,
-            "no-watermark", new BigDecimal(100))));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+    runner.updatePAssertCount(p);
     doReturn(State.RUNNING).when(job).getState();
-    assertTrue(runner.atMaxWatermark(job, metrics));
+    assertThat(runner.checkForPAssertSuccess(job), 
equalTo(Optional.<Boolean>absent()));
   }
 
+  /**
+   * Tests that if a streaming pipeline terminates with FAIL that the check 
for PAssert
+   * success is a conclusive failure.
+   */
   @Test
   public void testStreamingPipelineFailsIfServiceFails() throws Exception {
     DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, 
null));
@@ -488,16 +368,23 @@ public class TestDataflowRunnerTest {
     PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     doReturn(State.FAILED).when(job).getState();
-    assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null 
/* metrics */));
+    assertThat(runner.checkForPAssertSuccess(job), 
equalTo(Optional.of(false)));
   }
 
+  /**
+   * Tests that if a streaming pipeline crash loops for a non-assertion reason 
that the test run
+   * throws an {@link AssertionError}.
+   *
+   * <p>This is a known limitation/bug of the runner that it does not 
distinguish the two modes of
+   * failure.
+   */
   @Test
   public void testStreamingPipelineFailsIfException() throws Exception {
     options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    Pipeline pipeline = TestPipeline.create(options);
+    PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
     PAssert.that(pc).containsInAnyOrder(1, 2, 3);
 
     DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
@@ -521,18 +408,15 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
+
     try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, atLeastOnce()).cancel();
+      runner.run(pipeline, mockRunner);
+    } catch (AssertionError exc) {
       return;
     }
-    // Note that fail throws an AssertionError which is why it is placed out 
here
-    // instead of inside the try-catch block.
     fail("AssertionError expected");
   }
 
@@ -542,9 +426,9 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     JobMetrics metrics = runner.getJobMetrics(job);
 
     assertEquals(1, metrics.getMetrics().size());
@@ -558,8 +442,8 @@ public class TestDataflowRunnerTest {
     Pipeline p = TestPipeline.create(options);
     p.apply(Create.of(1, 2, 3));
 
-    when(request.execute()).thenThrow(new IOException());
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     assertNull(runner.getJobMetrics(job));
   }
 
@@ -577,12 +461,12 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
-    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
     runner.run(p, mockRunner);
   }
 
@@ -601,16 +485,16 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
 
     when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
 
-    when(request.execute())
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */,
-            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */
+        ));
     runner.run(p, mockRunner);
   }
 
@@ -628,15 +512,20 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
-    when(request.execute()).thenReturn(generateMockMetricResponse(true /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
     runner.run(p, mockRunner);
   }
 
+  /**
+   * Tests that when a streaming pipeline terminates and doesn't fail due to 
{@link PAssert} that
+   * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) 
on success matcher} is
+   * invoked.
+   */
   @Test
   public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws 
Exception {
     options.setStreaming(true);
@@ -652,16 +541,15 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
 
     when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
         .thenReturn(State.DONE);
 
-    when(request.execute())
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */,
-            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK)));
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(true /* success */, true /* 
tentative */));
     runner.run(p, mockRunner);
   }
 
@@ -679,12 +567,12 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 
-    when(request.execute()).thenReturn(generateMockMetricResponse(false /* 
success */,
-        true /* tentative */, null /* additionalMetrics */));
+    when(mockClient.getJobMetrics(anyString()))
+        .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
     try {
       runner.run(p, mockRunner);
     } catch (AssertionError expected) {
@@ -695,6 +583,11 @@ public class TestDataflowRunnerTest {
     fail("Expected an exception on pipeline failure.");
   }
 
+  /**
+   * Tests that when a streaming pipeline terminates in FAIL that the {@link
+   * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success 
matcher} is not
+   * invoked.
+   */
   @Test
   public void testStreamingOnSuccessMatcherWhenPipelineFails() throws 
Exception {
     options.setStreaming(true);
@@ -710,24 +603,15 @@ public class TestDataflowRunnerTest {
     DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
     when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
 
-    TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     p.getOptions().as(TestPipelineOptions.class)
         .setOnSuccessMatcher(new TestFailureMatcher());
 
     when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
         .thenReturn(State.FAILED);
 
-    when(request.execute()).thenReturn(
-        generateMockMetricResponse(false /* success */, true /* tentative */,
-            ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100))));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      verify(mockJob, Mockito.times(1)).waitUntilFinish(any(Duration.class),
-          any(JobMessagesHandler.class));
-      return;
-    }
-    fail("Expected an exception on pipeline failure.");
+    runner.run(p, mockRunner);
+    // If the onSuccessMatcher were invoked, it would have crashed here.
   }
 
   static class TestSuccessMatcher extends BaseMatcher<PipelineResult> 
implements

Reply via email to