More javadoc and keep retry in case of get metrics exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/370c1714 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/370c1714 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/370c1714 Branch: refs/heads/master Commit: 370c171450215d3fac4208875850279a796415c9 Parents: fbae96f Author: Mark Liu <mark...@markliu-macbookpro.roam.corp.google.com> Authored: Wed Sep 14 13:18:40 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue Sep 27 17:03:57 2016 -0700 ---------------------------------------------------------------------- examples/java/pom.xml | 7 +- .../beam/examples/WindowedWordCountIT.java | 13 +- .../dataflow/testing/TestDataflowRunner.java | 158 +++++----- .../testing/TestDataflowRunnerTest.java | 287 ++++++++++--------- .../apache/beam/sdk/testing/StreamingIT.java | 13 +- 5 files changed, 263 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 31244db..6b1b7ce 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -218,11 +218,8 @@ </profile> <!-- - This profile disable streaming integration tests which - have @Category(StreamingIT.class) annotation. - - This profile can be abled on the command line - by specifying -P disable-streaming-ITs. + This profile disables streaming integration tests which + have the @Category(StreamingIT.class) annotation. --> <profile> <id>disable-streaming-ITs</id> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index cddcd4a..379d1b0 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.beam.examples; import java.io.IOException; @@ -39,23 +38,25 @@ public class WindowedWordCountIT { /** * Options for the {@link WindowedWordCount} Integration Test. */ - public interface TestOptions extends Options, TestPipelineOptions, StreamingOptions{ + public interface WindowedWordCountITOptions + extends Options, TestPipelineOptions, StreamingOptions { } @Test public void testWindowedWordCountInBatch() throws IOException { - testWindowedWordCountPipeline(false); + testWindowedWordCountPipeline(false /* isStreaming */); } @Test @Category(StreamingIT.class) public void testWindowedWordCountInStreaming() throws IOException { - testWindowedWordCountPipeline(true); + testWindowedWordCountPipeline(true /* isStreaming */); } private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException { - PipelineOptionsFactory.register(TestOptions.class); - TestOptions options = TestPipeline.testingPipelineOptions().as(TestOptions.class); + PipelineOptionsFactory.register(WindowedWordCountITOptions.class); + WindowedWordCountITOptions options = + TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); options.setStreaming(isStreaming); WindowedWordCount.main(TestPipeline.convertToArgs(options)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index b8b4eaf..a152505 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Throwables; @@ -31,6 +32,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowJobExecutionException; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -110,7 +112,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { job, new MonitoringUtil.LoggingHandler()); try { - final Optional<Boolean> result; + final Optional<Boolean> success; if (options.isStreaming()) { Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit( @@ -119,9 +121,13 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { public Optional<Boolean> call() throws Exception { try { for (;;) { - Optional<Boolean> result = checkForSuccess(job); - if (result.isPresent() && (!result.get() || checkMaxWatermark(job))) { - return result; + JobMetrics metrics = getJobMetrics(job); + Optional<Boolean> success = checkForPAssertSuccess(job, metrics); + if (success.isPresent() && (!success.get() || atMaxWatermark(job, metrics))) { + // It's possible that the streaming pipeline doesn't use PAssert. + // So checkForSuccess() will return true before job is finished. + // atMaxWatermark() will handle this case. + return success; } Thread.sleep(10000L); } @@ -139,15 +145,15 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { job.getJobId()); job.cancel(); } - result = resultFuture.get(); + success = resultFuture.get(); } else { job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler); - result = checkForSuccess(job); + success = checkForPAssertSuccess(job, getJobMetrics(job)); } - if (!result.isPresent()) { + if (!success.isPresent()) { throw new IllegalStateException( "The dataflow did not output a success or failure metric."); - } else if (!result.get()) { + } else if (!success.get()) { throw new AssertionError(messageHandler.getErrorMessage() == null ? "The dataflow did not return a failure reason." : messageHandler.getErrorMessage()); @@ -178,7 +184,18 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { return runner.apply(transform, input); } - Optional<Boolean> checkForSuccess(DataflowPipelineJob job) + /** + * Check that PAssert expectations were met. + * + * <p>If the pipeline is not in a failed/cancelled state and no PAsserts were used + * within the pipeline, then this method will state that all PAsserts succeeded. + * + * @return Optional.of(false) if the job failed, was cancelled or if any PAssert + * expectation was not met, true if all the PAssert expectations passed, + * Optional.absent() if the metrics were inconclusive. + */ + @VisibleForTesting + Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job, @Nullable JobMetrics metrics) throws IOException { State state = job.getState(); if (state == State.FAILED || state == State.CANCELLED) { @@ -186,74 +203,85 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { return Optional.of(false); } - JobMetrics metrics = options.getDataflowClient().projects().jobs() - .getMetrics(job.getProjectId(), job.getJobId()).execute(); - if (metrics == null || metrics.getMetrics() == null) { LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); - } else { - int successes = 0; - int failures = 0; - for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null - || metric.getName().getContext() == null - || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { - // Don't double count using the non-tentative version of the metric. - continue; - } - if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) { - successes += ((BigDecimal) metric.getScalar()).intValue(); - } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) { - failures += ((BigDecimal) metric.getScalar()).intValue(); - } - } + return Optional.absent(); + } - if (failures > 0) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, - expectedNumberOfAssertions); - return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, - expectedNumberOfAssertions); - return Optional.of(true); + int successes = 0; + int failures = 0; + for (MetricUpdate metric : metrics.getMetrics()) { + if (metric.getName() == null + || metric.getName().getContext() == null + || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { + // Don't double count using the non-tentative version of the metric. + continue; } + if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) { + successes += ((BigDecimal) metric.getScalar()).intValue(); + } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) { + failures += ((BigDecimal) metric.getScalar()).intValue(); + } + } - LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected " - + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); + if (failures > 0) { + LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + + "{} expected assertions.", job.getJobId(), successes, failures, + expectedNumberOfAssertions); + return Optional.of(false); + } else if (successes >= expectedNumberOfAssertions) { + LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + + "{} expected assertions.", job.getJobId(), successes, failures, + expectedNumberOfAssertions); + return Optional.of(true); } - return Optional.<Boolean>absent(); + LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected " + + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); + return Optional.absent(); } - boolean checkMaxWatermark(DataflowPipelineJob job) throws IOException { - JobMetrics metrics = options.getDataflowClient().projects().jobs() - .getMetrics(job.getProjectId(), job.getJobId()).execute(); - - if (metrics == null || metrics.getMetrics() == null) { - LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); - } else { - boolean hasMaxWatermark = false; - for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null - || metric.getName().getName() == null - || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX) - || metric.getScalar() == null) { - continue; - } - BigDecimal watermark = (BigDecimal) metric.getScalar(); - hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE; - if (!hasMaxWatermark) { - break; - } + /** + * Check watermarks of the streaming job. At least one watermark metric must exist. + * + * @return true if all watermarks are at max, false otherwise. + */ + @VisibleForTesting + boolean atMaxWatermark(DataflowPipelineJob job, JobMetrics metrics) { + boolean hasMaxWatermark = false; + for (MetricUpdate metric : metrics.getMetrics()) { + if (metric.getName() == null + || metric.getName().getName() == null + || !metric.getName().getName().endsWith(WATERMARK_METRIC_SUFFIX) + || metric.getScalar() == null) { + continue; } - if (hasMaxWatermark) { - LOG.info("All watermarks of job {} reach to max value.", job.getJobId()); - return true; + BigDecimal watermark = (BigDecimal) metric.getScalar(); + hasMaxWatermark = watermark.longValue() == MAX_WATERMARK_VALUE; + if (!hasMaxWatermark) { + LOG.info("Found a non-max watermark metric {} in job {}", metric.getName().getName(), + job.getJobId()); + return false; } } - return false; + + if (hasMaxWatermark) { + LOG.info("All watermarks are at max. JobID: {}", job.getJobId()); + } + return hasMaxWatermark; + } + + @Nullable + @VisibleForTesting + JobMetrics getJobMetrics(DataflowPipelineJob job) { + JobMetrics metrics = null; + try { + metrics = options.getDataflowClient().projects().jobs() + .getMetrics(job.getProjectId(), job.getJobId()).execute(); + } catch (IOException e) { + LOG.warn("Failed to get job metrics: ", e); + } + return metrics; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 3818b35..e6b513a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.testing; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -50,6 +51,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -93,6 +95,9 @@ public class TestDataflowRunnerTest { @Mock private MockLowLevelHttpRequest request; @Mock private GcsUtil mockGcsUtil; + private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; + private static final BigDecimal DEFAULT_MAX_WATERMARK = new BigDecimal(-2); + private TestDataflowPipelineOptions options; private Dataflow service; @@ -135,8 +140,8 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); + when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, + true /* tentative */, null /* additionalMetrics */)); assertEquals(mockJob, runner.run(p, mockRunner)); } @@ -155,6 +160,8 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, + false /* tentative */, null /* additionalMetrics */)); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -192,8 +199,8 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); + when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, + true /* tentative */, null /* additionalMetrics */)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); try { runner.run(p, mockRunner); @@ -208,7 +215,7 @@ public class TestDataflowRunnerTest { } @Test - public void testRunStreamingJobThatSucceeds() throws Exception { + public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception { options.setStreaming(true); Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); @@ -223,18 +230,14 @@ public class TestDataflowRunnerTest { when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, + ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); runner.run(p, mockRunner); } @Test - public void testRunStreamingJobThatReachMaxWatermarkAndSucceeds() throws Exception { + public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception { options.setStreaming(true); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); @@ -249,15 +252,7 @@ public class TestDataflowRunnerTest { when(request.execute()) .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); runner.run(p, mockRunner); } @@ -277,8 +272,8 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); + when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, + true /* tentative */, null /* additionalMetrics */)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); try { runner.run(p, mockRunner); @@ -290,41 +285,61 @@ public class TestDataflowRunnerTest { fail("AssertionError expected"); } - private LowLevelHttpResponse generateMockStreamingMetricResponse( - boolean hasWatermark, - boolean maxWatermark, - boolean multipleWatermarks, - boolean multipleMaxWatermark) throws IOException { - List<MetricUpdate> metrics = Lists.newArrayList(); + private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative, + Map<String, BigDecimal> additionalMetrics) + throws Exception { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setContentType(Json.MEDIA_TYPE); + List<MetricUpdate> metrics = generateMockMetrics(success, tentative); + if (additionalMetrics != null && !additionalMetrics.isEmpty()) { + metrics.addAll(generateMockStreamingMetrics(additionalMetrics)); + } + JobMetrics jobMetrics = buildJobMetrics(metrics); + response.setContent(jobMetrics.toPrettyString()); + return response; + } + private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative) { MetricStructuredName name = new MetricStructuredName(); - name.setName(hasWatermark ? "windmill-data-watermark" : "no-watermark"); - name.setContext(ImmutableMap.<String, String>of()); + name.setName(success ? "PAssertSuccess" : "PAssertFailure"); + name.setContext( + tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of()); MetricUpdate metric = new MetricUpdate(); metric.setName(name); - metric.setScalar(maxWatermark ? BigDecimal.valueOf(-2L) : BigDecimal.ONE); - metrics.add(metric); - - if (multipleWatermarks) { - MetricStructuredName nameTwo = new MetricStructuredName(); - nameTwo.setName(hasWatermark ? "windmill-data-watermark" : "no-watermark"); - nameTwo.setContext(ImmutableMap.<String, String>of()); - - MetricUpdate metricTwo = new MetricUpdate(); - metricTwo.setName(nameTwo); - metricTwo.setScalar(multipleMaxWatermark ? BigDecimal.valueOf(-2L) : BigDecimal.ONE); - metrics.add(metricTwo); - } + metric.setScalar(BigDecimal.ONE); + return Lists.newArrayList(metric); + } + private LowLevelHttpResponse generateMockStreamingMetricResponse(Map<String, + BigDecimal> metricMap) throws IOException { MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); response.setContentType(Json.MEDIA_TYPE); + JobMetrics jobMetrics = buildJobMetrics(generateMockStreamingMetrics(metricMap)); + response.setContent(jobMetrics.toPrettyString()); + return response; + } + + private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal> metricMap) { + List<MetricUpdate> metrics = Lists.newArrayList(); + for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) { + MetricStructuredName name = new MetricStructuredName(); + name.setName(entry.getKey()); + + MetricUpdate metric = new MetricUpdate(); + metric.setName(name); + metric.setScalar(entry.getValue()); + metrics.add(metric); + } + return metrics; + } + + private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) { JobMetrics jobMetrics = new JobMetrics(); - jobMetrics.setMetrics(metrics); + jobMetrics.setMetrics(metricList); // N.B. Setting the factory is necessary in order to get valid JSON. jobMetrics.setFactory(Transport.getJsonFactory()); - response.setContent(jobMetrics.toPrettyString()); - return response; + return jobMetrics; } @Test @@ -336,10 +351,10 @@ public class TestDataflowRunnerTest { PAssert.that(pc).containsInAnyOrder(1, 2, 3); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); doReturn(State.DONE).when(job).getState(); - assertEquals(Optional.of(true), runner.checkForSuccess(job)); + JobMetrics metrics = buildJobMetrics( + generateMockMetrics(true /* success */, true /* tentative */)); + assertEquals(Optional.of(true), runner.checkForPAssertSuccess(job, metrics)); } @Test @@ -351,10 +366,10 @@ public class TestDataflowRunnerTest { PAssert.that(pc).containsInAnyOrder(1, 2, 3); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); doReturn(State.DONE).when(job).getState(); - assertEquals(Optional.of(false), runner.checkForSuccess(job)); + JobMetrics metrics = buildJobMetrics( + generateMockMetrics(false /* success */, true /* tentative */)); + assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, metrics)); } @Test @@ -366,121 +381,97 @@ public class TestDataflowRunnerTest { PAssert.that(pc).containsInAnyOrder(1, 2, 3); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, false /* tentative */)); doReturn(State.RUNNING).when(job).getState(); - assertEquals(Optional.absent(), runner.checkForSuccess(job)); + JobMetrics metrics = buildJobMetrics( + generateMockMetrics(true /* success */, false /* tentative */)); + assertEquals(Optional.absent(), runner.checkForPAssertSuccess(job, metrics)); } - private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative) - throws Exception { - MetricStructuredName name = new MetricStructuredName(); - name.setName(success ? "PAssertSuccess" : "PAssertFailure"); - name.setContext( - tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of()); - - MetricUpdate metric = new MetricUpdate(); - metric.setName(name); - metric.setScalar(BigDecimal.ONE); + @Test + public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContentType(Json.MEDIA_TYPE); - JobMetrics jobMetrics = new JobMetrics(); - jobMetrics.setMetrics(Lists.newArrayList(metric)); - // N.B. Setting the factory is necessary in order to get valid JSON. - jobMetrics.setFactory(Transport.getJsonFactory()); - response.setContent(jobMetrics.toPrettyString()); - return response; + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( + ImmutableMap.of("no-watermark", new BigDecimal(100)))); + doReturn(State.RUNNING).when(job).getState(); + assertFalse(runner.atMaxWatermark(job, metrics)); } @Test - public void testCheckMaxWatermarkWithNoWatermarkMetric() throws IOException { + public void testCheckMaxWatermarkWithSingleWatermarkAtMax() throws IOException { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-project", "test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()) - .thenReturn(generateMockStreamingMetricResponse( - false /* hasWatermark */, - false /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( + ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.checkMaxWatermark(job)); + assertTrue(runner.atMaxWatermark(job, metrics)); } @Test - public void testCheckMaxWatermarkWithSingleMaxWatermark() throws IOException { + public void testCheckMaxWatermarkWithSingleWatermarkNotAtMax() throws IOException { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-project", "test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics + (ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.checkMaxWatermark(job)); + assertFalse(runner.atMaxWatermark(job, metrics)); } @Test - public void testCheckMaxWatermarkWithSingleWatermarkNotMax() throws IOException { + public void testCheckMaxWatermarkWithMultipleWatermarksAtMax() throws IOException { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-project", "test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - false /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( + ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, + "two" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.checkMaxWatermark(job)); + assertTrue(runner.atMaxWatermark(job, metrics)); } @Test - public void testCheckMaxWatermarkWithMultipleMaxWatermark() throws IOException { + public void testCheckMaxWatermarkWithMultipleMaxAndNotMaxWatermarks() throws IOException { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-project", "test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - true /* multipleWatermarks */, - true /* multipleMaxWatermark */)); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( + ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, + "two" + WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); doReturn(State.RUNNING).when(job).getState(); - assertTrue(runner.checkMaxWatermark(job)); + assertFalse(runner.atMaxWatermark(job, metrics)); } @Test - public void testCheckMaxWatermarkWithMaxAndNotMaxWatermarkMixed() throws IOException { + public void testCheckMaxWatermarkIgnoresUnrelatedMatrics() throws IOException { DataflowPipelineJob job = spy(new DataflowPipelineJob("test-project", "test-job", options, null)); Pipeline p = TestPipeline.create(options); p.apply(Create.of(1, 2, 3)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - true /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + JobMetrics metrics = buildJobMetrics(generateMockStreamingMetrics( + ImmutableMap.of("one" + WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK, + "no-watermark", new BigDecimal(100)))); doReturn(State.RUNNING).when(job).getState(); - assertFalse(runner.checkMaxWatermark(job)); + assertTrue(runner.atMaxWatermark(job, metrics)); } @Test @@ -492,10 +483,8 @@ public class TestDataflowRunnerTest { PAssert.that(pc).containsInAnyOrder(1, 2, 3); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, false /* tentative */)); doReturn(State.FAILED).when(job).getState(); - assertEquals(Optional.of(false), runner.checkForSuccess(job)); + assertEquals(Optional.of(false), runner.checkForPAssertSuccess(job, null /* metrics */)); } @Test @@ -526,8 +515,8 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); + when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, + true /* tentative */, null /* additionalMetrics */)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); try { runner.run(p, mockRunner); @@ -542,6 +531,35 @@ public class TestDataflowRunnerTest { } @Test + public void testGetJobMetricsThatSucceeds() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, + true /* tentative */, null /* additionalMetrics */)); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + JobMetrics metrics = runner.getJobMetrics(job); + + assertEquals(1, metrics.getMetrics().size()); + assertEquals(generateMockMetrics(true /* success */, true /* tentative */), + metrics.getMetrics()); + } + + @Test + public void testGetJobMetricsThatFailsForException() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", options, null)); + Pipeline p = TestPipeline.create(options); + p.apply(Create.of(1, 2, 3)); + + when(request.execute()).thenThrow(new IOException()); + TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); + assertNull(runner.getJobMetrics(job)); + } + + @Test public void testBatchOnCreateMatcher() throws Exception { Pipeline p = TestPipeline.create(options); PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); @@ -559,8 +577,8 @@ public class TestDataflowRunnerTest { p.getOptions().as(TestPipelineOptions.class) .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); + when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, + true /* tentative */, null /* additionalMetrics */)); runner.run(p, mockRunner); } @@ -587,12 +605,8 @@ public class TestDataflowRunnerTest { .thenReturn(State.DONE); when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, + ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); runner.run(p, mockRunner); } @@ -614,8 +628,8 @@ public class TestDataflowRunnerTest { p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); + when(request.execute()).thenReturn(generateMockMetricResponse(true /* success */, + true /* tentative */, null /* additionalMetrics */)); runner.run(p, mockRunner); } @@ -642,12 +656,8 @@ public class TestDataflowRunnerTest { .thenReturn(State.DONE); when(request.execute()) - .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)) - .thenReturn(generateMockStreamingMetricResponse( - true /* hasWatermark */, - true /* maxWatermark */, - false /* multipleWatermarks */, - false /* multipleMaxWatermark */)); + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */, + ImmutableMap.of(WATERMARK_METRIC_SUFFIX, DEFAULT_MAX_WATERMARK))); runner.run(p, mockRunner); } @@ -669,8 +679,8 @@ public class TestDataflowRunnerTest { p.getOptions().as(TestPipelineOptions.class) .setOnSuccessMatcher(new TestFailureMatcher()); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); + when(request.execute()).thenReturn(generateMockMetricResponse(false /* success */, + true /* tentative */, null /* additionalMetrics */)); try { runner.run(p, mockRunner); } catch (AssertionError expected) { @@ -704,7 +714,8 @@ public class TestDataflowRunnerTest { .thenReturn(State.FAILED); when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); + generateMockMetricResponse(false /* success */, true /* tentative */, + ImmutableMap.of(WATERMARK_METRIC_SUFFIX, new BigDecimal(100)))); try { runner.run(p, mockRunner); } catch (AssertionError expected) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java index b3dd4a0..4922d83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java @@ -18,7 +18,18 @@ package org.apache.beam.sdk.testing; /** - * Category tag for validation tests which are expected running in streaming mode. + * Category tag used to mark tests which execute using the Dataflow runner + * in streaming mode. Example usage: + * <pre><code> + * {@literal @}Test + * {@literal @}Category(StreamingIT.class) + * public void testStreamingPipeline() { + * StreamingOptions options = ...; + * options.setStreaming(true); + * StreamingPipeline.main(...); + * } + * </code></pre> */ +@Deprecated public interface StreamingIT { }