Fix checkMaxWatermark causing batch test failed
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f0588a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f0588a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f0588a2 Branch: refs/heads/master Commit: 9f0588a2d63dd8538675b128a488ea5fa9c491f2 Parents: f6bd47b Author: Mark Liu <mark...@markliu-macbookpro.roam.corp.google.com> Authored: Wed Sep 7 11:59:02 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue Sep 27 17:03:56 2016 -0700 ---------------------------------------------------------------------- .../dataflow/testing/TestDataflowRunner.java | 20 +++------- .../testing/TestDataflowRunnerTest.java | 40 ++++++++++++++------ 2 files changed, 33 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index c569cd4..b8b4eaf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -120,11 +120,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { try { for (;;) { Optional<Boolean> result = checkForSuccess(job); - if (result.isPresent()) { - return result; - } - result = checkMaxWatermark(job); - if (result.isPresent()) { + if (result.isPresent() && (!result.get() || checkMaxWatermark(job))) { return result; } Thread.sleep(10000L); @@ -217,7 +213,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); return Optional.of(false); - } else if (successes > 0 && successes >= expectedNumberOfAssertions) { + } else if (successes >= expectedNumberOfAssertions) { LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); @@ -231,13 +227,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { return Optional.<Boolean>absent(); } - Optional<Boolean> checkMaxWatermark(DataflowPipelineJob job) throws IOException { - State state = job.getState(); - if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("The pipeline {}", state); - return Optional.of(false); - } - + boolean checkMaxWatermark(DataflowPipelineJob job) throws IOException { JobMetrics metrics = options.getDataflowClient().projects().jobs() .getMetrics(job.getProjectId(), job.getJobId()).execute(); @@ -260,10 +250,10 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { } if (hasMaxWatermark) { LOG.info("All watermarks of job {} reach to max value.", job.getJobId()); - return Optional.of(true); + return true; } } - return Optional.absent(); + return false; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 70c4562..3818b35 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -19,8 +19,10 @@ package org.apache.beam.runners.dataflow.testing; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -220,8 +222,13 @@ public class TestDataflowRunnerTest { DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); + when(request.execute()) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)); TestDataflowRunner runner = (TestDataflowRunner) p.getRunner(); runner.run(p, mockRunner); } @@ -401,7 +408,7 @@ public class TestDataflowRunnerTest { false /* multipleWatermarks */, false /* multipleMaxWatermark */)); doReturn(State.RUNNING).when(job).getState(); - assertEquals(Optional.absent(), runner.checkMaxWatermark(job)); + assertFalse(runner.checkMaxWatermark(job)); } @Test @@ -419,7 +426,7 @@ public class TestDataflowRunnerTest { false /* multipleWatermarks */, false /* multipleMaxWatermark */)); doReturn(State.RUNNING).when(job).getState(); - assertEquals(Optional.of(true), runner.checkMaxWatermark(job)); + assertTrue(runner.checkMaxWatermark(job)); } @Test @@ -437,7 +444,7 @@ public class TestDataflowRunnerTest { false /* multipleWatermarks */, false /* multipleMaxWatermark */)); doReturn(State.RUNNING).when(job).getState(); - assertEquals(Optional.absent(), runner.checkMaxWatermark(job)); + assertFalse(runner.checkMaxWatermark(job)); } @Test @@ -455,7 +462,7 @@ public class TestDataflowRunnerTest { true /* multipleWatermarks */, true /* multipleMaxWatermark */)); doReturn(State.RUNNING).when(job).getState(); - assertEquals(Optional.of(true), runner.checkMaxWatermark(job)); + assertTrue(runner.checkMaxWatermark(job)); } @Test @@ -473,7 +480,7 @@ public class TestDataflowRunnerTest { true /* multipleWatermarks */, false /* multipleMaxWatermark */)); doReturn(State.RUNNING).when(job).getState(); - assertEquals(Optional.absent(), runner.checkMaxWatermark(job)); + assertFalse(runner.checkMaxWatermark(job)); } @Test @@ -489,7 +496,6 @@ public class TestDataflowRunnerTest { generateMockMetricResponse(true /* success */, false /* tentative */)); doReturn(State.FAILED).when(job).getState(); assertEquals(Optional.of(false), runner.checkForSuccess(job)); - assertEquals(Optional.of(false), runner.checkMaxWatermark(job)); } @Test @@ -580,8 +586,13 @@ public class TestDataflowRunnerTest { when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); + when(request.execute()) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)); runner.run(p, mockRunner); } @@ -630,8 +641,13 @@ public class TestDataflowRunnerTest { when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) .thenReturn(State.DONE); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); + when(request.execute()) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)) + .thenReturn(generateMockStreamingMetricResponse( + true /* hasWatermark */, + true /* maxWatermark */, + false /* multipleWatermarks */, + false /* multipleMaxWatermark */)); runner.run(p, mockRunner); }