Repository: beam Updated Branches: refs/heads/master 196f4ba0b -> 634bf4e3a
Do not rely on metrics in streaming TestDataflowRunner The Dataflow service automatically shuts down jobs when their input watermarks reach infinity, and other functionality can be restored when the feature is restored to the Dataflow service. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed8bd626 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed8bd626 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed8bd626 Branch: refs/heads/master Commit: ed8bd62652126d8d0cf054cee5cc79dda88e3415 Parents: 196f4ba Author: Kenneth Knowles <[email protected]> Authored: Wed Apr 26 17:55:29 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Apr 27 08:50:54 2017 -0700 ---------------------------------------------------------------------- .../dataflow/testing/TestDataflowRunner.java | 181 ++++------ .../testing/TestDataflowRunnerTest.java | 358 +++++++------------ 2 files changed, 195 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ed8bd626/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index dc32466..ba9d971 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -26,13 +26,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Strings; -import com.google.common.base.Throwables; import java.io.IOException; import java.math.BigDecimal; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.DataflowPipelineJob; @@ -59,13 +55,6 @@ import org.slf4j.LoggerFactory; */ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { private static final String TENTATIVE_COUNTER = "tentative"; - // See https://issues.apache.org/jira/browse/BEAM-1170 - // we need to either fix the API or pipe the DRAINED signal through - @VisibleForTesting - static final String LEGACY_WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; - @VisibleForTesting - static final String WATERMARK_METRIC_SUFFIX = "DataWatermark"; - private static final long MAX_WATERMARK_VALUE = -2L; private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); private final TestDataflowPipelineOptions options; @@ -73,9 +62,9 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { private final DataflowRunner runner; private int expectedNumberOfAssertions = 0; - TestDataflowRunner(TestDataflowPipelineOptions options) { + TestDataflowRunner(TestDataflowPipelineOptions options, DataflowClient client) { this.options = options; - this.dataflowClient = DataflowClient.create(options); + this.dataflowClient = client; this.runner = DataflowRunner.fromOptions(options); } @@ -91,7 +80,14 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { "results"); dataflowOptions.setTempLocation(tempLocation); - return new TestDataflowRunner(dataflowOptions); + return new TestDataflowRunner( + dataflowOptions, DataflowClient.create(options.as(DataflowPipelineOptions.class))); + } + + @VisibleForTesting + static TestDataflowRunner fromOptionsAndClient( + TestDataflowPipelineOptions options, DataflowClient client) { + return new TestDataflowRunner(options, client); } @Override @@ -115,40 +111,34 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler()); try { - final Optional<Boolean> success; + Optional<Boolean> result = Optional.absent(); if (options.isStreaming()) { - Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit( - new Callable<Optional<Boolean>>() { - @Override - public Optional<Boolean> call() throws Exception { - try { - for (;;) { - JobMetrics metrics = getJobMetrics(job); - Optional<Boolean> success = checkForPAssertSuccess(job, metrics); - if (messageHandler.hasSeenError()) { - return Optional.of(false); - } - - if (success.isPresent() && (!success.get() || atMaxWatermark(job, metrics))) { - // It's possible that the streaming pipeline doesn't use PAssert. - // So checkForSuccess() will return true before job is finished. - // atMaxWatermark() will handle this case. - return success; - } - Thread.sleep(10000L); - } - } finally { - if (!job.getState().isTerminal()) { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - job.cancel(); - } + // In streaming, there are infinite retries, so rather than timeout + // we try to terminate early by polling and canceling if we see + // an error message + while (true) { + State state = job.waitUntilFinish(Duration.standardSeconds(3), messageHandler); + if (state != null && state.isTerminal()) { + break; + } + + if (messageHandler.hasSeenError()) { + if (!job.getState().isTerminal()) { + LOG.info("Cancelling Dataflow job {}", job.getJobId()); + job.cancel(); } + break; } - }); + } + + // Whether we canceled or not, this gets the final state of the job or times out State finalState = job.waitUntilFinish( Duration.standardSeconds(options.getTestTimeoutSeconds()), messageHandler); + + // Getting the final state timed out; it may not indicate a failure. + // This cancellation may be the second if (finalState == null || finalState == State.RUNNING) { LOG.info( "Dataflow job {} took longer than {} seconds to complete, cancelling.", @@ -156,15 +146,28 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { options.getTestTimeoutSeconds()); job.cancel(); } - success = resultFuture.get(); + + if (messageHandler.hasSeenError()) { + result = Optional.of(false); + } } else { job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler); - success = checkForPAssertSuccess(job, getJobMetrics(job)); + result = checkForPAssertSuccess(job); } - if (!success.isPresent()) { - throw new IllegalStateException( - "The dataflow did not output a success or failure metric."); - } else if (!success.get()) { + + if (!result.isPresent()) { + if (options.isStreaming()) { + LOG.warn( + "Dataflow job {} did not output a success or failure metric." + + " In rare situations, some PAsserts may not have run." + + " This is a known limitation of Dataflow in streaming.", + job.getJobId()); + } else { + throw new IllegalStateException( + String.format( + "Dataflow job %s did not output a success or failure metric.", job.getJobId())); + } + } else if (!result.get()) { throw new AssertionError( Strings.isNullOrEmpty(messageHandler.getErrorMessage()) ? String.format( @@ -177,9 +180,6 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); - } catch (ExecutionException e) { - Throwables.propagateIfPossible(e.getCause()); - throw new RuntimeException(e.getCause()); } catch (IOException e) { throw new RuntimeException(e); } @@ -200,22 +200,24 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { /** * Check that PAssert expectations were met. * - * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used - * within the pipeline, then this method will state that all PAsserts succeeded. + * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used within the + * pipeline, then this method will state that all PAsserts succeeded. * - * @return Optional.of(false) if the job failed, was cancelled or if any PAssert - * expectation was not met, true if all the PAssert expectations passed, - * Optional.absent() if the metrics were inconclusive. + * @return Optional.of(false) if we are certain a PAssert or some other critical thing has failed, + * Optional.of(true) if we are certain all PAsserts passed, and Optional.absent() if the + * evidence is inconclusive. */ @VisibleForTesting - Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job, @Nullable JobMetrics metrics) - throws IOException { + Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws IOException { + + // If the job failed, this is a definite failure. We only cancel jobs when they fail. State state = job.getState(); if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("The pipeline failed"); + LOG.info("Dataflow job {} terminated in failure state {}", job.getJobId(), state); return Optional.of(false); } + JobMetrics metrics = getJobMetrics(job); if (metrics == null || metrics.getMetrics() == null) { LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); return Optional.absent(); @@ -238,66 +240,31 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { } if (failures > 0) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + LOG.info("Failure result for Dataflow job {}. Found {} success, {} failures out of " + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); return Optional.of(false); } else if (successes >= expectedNumberOfAssertions) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, + LOG.info( + "Success result for Dataflow job {}." + + " Found {} success, {} failures out of {} expected assertions.", + job.getJobId(), + successes, + failures, expectedNumberOfAssertions); return Optional.of(true); } - LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected " - + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); + LOG.info( + "Inconclusive results for Dataflow job {}." + + " Found {} success, {} failures out of {} expected assertions.", + job.getJobId(), + successes, + failures, + expectedNumberOfAssertions); return Optional.absent(); } - /** - * Checks wether a metric is a streaming watermark. - * - * @return true if the metric is a watermark. - */ - boolean isWatermark(MetricUpdate metric) { - if (metric.getName() == null || metric.getName().getName() == null) { - return false; // no name -> shouldn't happen, not the watermark - } - if (metric.getScalar() == null) { - return false; // no scalar value -> not the watermark - } - String name = metric.getName().getName(); - return name.endsWith(LEGACY_WATERMARK_METRIC_SUFFIX) - || name.endsWith(WATERMARK_METRIC_SUFFIX); - } - - /** - * Check watermarks of the streaming job. At least one watermark metric must exist. - * - * @return true if all watermarks are at max, false otherwise. - */ - @VisibleForTesting - boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) { - boolean hasMaxWatermark = false; - for (MetricUpdate metric : metrics.getMetrics()) { - if (!isWatermark(metric)) { - continue; - } - BigDecimal watermark = (BigDecimal) metric.getScalar(); - hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE; - if (!hasMaxWatermark) { - LOG.info("Found a non-max watermark metric {} in job {}", metric.getName().getName(), - job.getJobId()); - return false; - } - } - - if (hasMaxWatermark) { - LOG.info("All watermarks are at max. JobID: {}", job.getJobId()); - } - return hasMaxWatermark; - } - @Nullable @VisibleForTesting JobMetrics getJobMetrics(DataflowPipelineJob job) { http://git-wip-us.apache.org/repos/asf/beam/blob/ed8bd626/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index e4fa788..307393c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -17,32 +17,21 @@ */ package org.apache.beam.runners.dataflow.testing; -import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.LEGACY_WATERMARK_METRIC_SUFFIX; -import static org.apache.beam.runners.dataflow.testing.TestDataflowRunner.WATERMARK_METRIC_SUFFIX; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.Json; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricStructuredName; @@ -55,6 +44,7 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -69,7 +59,6 @@ import org.apache.beam.sdk.testing.SerializableMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.NoopPathValidator; import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.Transport; @@ -94,21 +83,13 @@ import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class TestDataflowRunnerTest { @Rule public ExpectedException expectedException = ExpectedException.none(); - @Mock private MockHttpTransport transport; - @Mock private MockLowLevelHttpRequest request; - @Mock private GcsUtil mockGcsUtil; - - private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2); + @Mock private DataflowClient mockClient; private TestDataflowPipelineOptions options; - private Dataflow service; @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - when(transport.buildRequest(anyString(), anyString())).thenReturn(request); - doCallRealMethod().when(request).getContentAsString(); - service = new Dataflow(transport, Transport.getJsonFactory(), null); options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); options.setAppName("TestAppName"); @@ -116,7 +97,6 @@ public class TestDataflowRunnerTest { options.setTempLocation("gs://test/temp/location"); options.setTempRoot("gs://test"); options.setGcpCredential(new TestCredential()); - options.setDataflowClient(service); options.setRunner(TestDataflowRunner.class); options.setPathValidatorClass(NoopPathValidator.class); } @@ -124,12 +104,12 @@ public class TestDataflowRunnerTest { @Test public void testToString() { assertEquals("TestDataflowRunner#TestAppName", - new TestDataflowRunner(options).toString()); + TestDataflowRunner.fromOptions(options).toString()); } @Test public void testRunBatchJobThatSucceeds() throws Exception { - Pipeline p = TestPipeline.create(options); + Pipeline p = Pipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); @@ -141,9 +121,9 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); assertEquals(mockJob, runner.run(p, mockRunner)); } @@ -161,9 +141,9 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - false /* tentative */, null /* additionalMetrics */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */)); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -201,9 +181,9 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -216,6 +196,9 @@ public class TestDataflowRunnerTest { fail("AssertionError expected"); } + /** + * A streaming job that terminates with no error messages is a success. + */ @Test public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception { options.setStreaming(true); @@ -224,17 +207,18 @@ public class TestDataflowRunnerTest { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); runner.run(p, mockRunner); } @@ -245,60 +229,37 @@ public class TestDataflowRunnerTest { p.apply(Create.of(1, 2, 3)); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()) - .thenReturn(generateMockStreamingMetricResponse( - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of())); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); runner.run(p, mockRunner); } + /** + * Tests that a streaming job with a false {@link PAssert} fails. + * + * <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure, because it + * is detected only by failure job messages. With fuller metric support, this can detect a PAssert + * failure via metrics and raise an {@link AssertionError} in just that case. + */ @Test public void testRunStreamingJobThatFails() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getState()).thenReturn(State.RUNNING); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - return; - } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. - fail("AssertionError expected"); + testStreamingPipelineFailsIfException(); } - private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative, - Map<String, BigDecimal> additionalMetrics) + private JobMetrics generateMockMetricResponse(boolean success, boolean tentative) throws Exception { - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContentType(Json.MEDIA_TYPE); List<MetricUpdate> metrics = generateMockMetrics(success, tentative); - if (additionalMetrics != null && !additionalMetrics.isEmpty()) { - metrics.addAll(generateMockStreamingMetrics(additionalMetrics)); - } - JobMetrics jobMetrics = buildJobMetrics(metrics); - response.setContent(jobMetrics.toPrettyString()); - return response; + return buildJobMetrics(metrics); } private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) { @@ -313,13 +274,9 @@ public class TestDataflowRunnerTest { return Lists.newArrayList(metric); } - private LowLevelHttpResponse generateMockStreamingMetricResponse(Map<String, + private JobMetrics generateMockStreamingMetricResponse(Map<String, BigDecimal> metricMap) throws IOException { - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContentType(Json.MEDIA_TYPE); - JobMetrics jobMetrics = buildJobMetrics(generateMockStreamingMetrics(metricMap)); - response.setContent(jobMetrics.toPrettyString()); - return response; + return buildJobMetrics(generateMockStreamingMetrics(metricMap)); } private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) { @@ -344,6 +301,10 @@ public class TestDataflowRunnerTest { return jobMetrics; } + /** + * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert} has + * succeeded. + */ @Test public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); @@ -351,13 +312,18 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative */))); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); doReturn(State.DONE).when(job).getState(); - JobMetrics metrics = buildJobMetrics( - generateMockMetrics(true /* success */, true /* tentative */)); - assertEquals(Optional.of(true), runner.checkForPAssertSuccess(job, metrics)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true))); } + /** + * Tests that when we just see a tentative failure for a {@link PAssert} it is considered a + * conclusive failure. + */ @Test public void testCheckingForSuccessWhenPAssertFails() throws Exception { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); @@ -365,11 +331,13 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn( + buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */))); + + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); doReturn(State.DONE).when(job).getState(); - JobMetrics metrics = buildJobMetrics( - generateMockMetrics(false /* success */, true /* tentative */)); - assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, metrics)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false))); } @Test @@ -379,108 +347,20 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - runner.updatePAssertCount(p); - doReturn(State.RUNNING).when(job).getState(); - JobMetrics metrics = buildJobMetrics( - generateMockMetrics(true /* success */, false /* tentative */)); - assertEquals(Optional.absent(), runner.checkForPAssertSuccess(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("no-watermark", new BigDecimal(100)))); - doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithLegacyWatermarkAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of(LEGACY_WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn( + buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */))); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics - (ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); - doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, - "two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); - doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, - "two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); - doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.atMaxWatermark(job, metrics)); - } - - @Test - public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException { - DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); - Pipeline p = TestPipeline.create(options); - p.apply(Create.of(1, 2, 3)); - - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); - JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( - ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, - "no-watermark", new BigDecimal(100)))); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + runner.updatePAssertCount(p); doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.atMaxWatermark(job, metrics)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent())); } + /** + * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert + * success is a conclusive failure. + */ @Test public void testStreamingPipelineFailsIfServiceFails() throws Exception { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-job", options, null)); @@ -488,16 +368,23 @@ public class TestDataflowRunnerTest { PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); doReturn(State.FAILED).when(job).getState(); - assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null /* metrics */)); + assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false))); } + /** + * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test run + * throws an {@link AssertionError}. + * + * <p>This is a known limitation/bug of the runner that it does not distinguish the two modes of + * failure. + */ @Test public void testStreamingPipelineFailsIfException() throws Exception { options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + Pipeline pipeline = TestPipeline.create(options); + PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3)); PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); @@ -521,18 +408,15 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - assertThat(expected.getMessage(), containsString("FooException")); - verify(mockJob, atLeastOnce()).cancel(); + runner.run(pipeline, mockRunner); + } catch (AssertionError exc) { return; } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. fail("AssertionError expected"); } @@ -542,9 +426,9 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); JobMetrics metrics = runner.getJobMetrics(job); assertEquals(1, metrics.getMetrics().size()); @@ -558,8 +442,8 @@ public class TestDataflowRunnerTest { Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); - when(request.execute()).thenThrow(new IOException()); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException()); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); assertNull(runner.getJobMetrics(job)); } @@ -577,12 +461,12 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); runner.run(p, mockRunner); } @@ -601,16 +485,16 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); - when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */ + )); runner.run(p, mockRunner); } @@ -628,15 +512,20 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); - when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, - true /* tentative */, null /* additionalMetrics */)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); runner.run(p, mockRunner); } + /** + * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert} that + * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is + * invoked. + */ @Test public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception { options.setStreaming(true); @@ -652,16 +541,15 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); - when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); runner.run(p, mockRunner); } @@ -679,12 +567,12 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); - when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, - true /* tentative */, null /* additionalMetrics */)); + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -695,6 +583,11 @@ public class TestDataflowRunnerTest { fail("Expected an exception on pipeline failure."); } + /** + * Tests that when a streaming pipeline terminates in FAIL that the {@link + * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is not + * invoked. + */ @Test public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { options.setStreaming(true); @@ -710,24 +603,15 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - TestDataflowRunner runner = TestDataflowRunner.fromOptions(options); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.FAILED); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */, - ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - verify(mockJob, Mockito.times(1)).waitUntilFinish(any(Duration.class), - any(JobMessagesHandler.class)); - return; - } - fail("Expected an exception on pipeline failure."); + runner.run(p, mockRunner); + // If the onSuccessMatcher were invoked, it would have crashed here. } static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
