http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
deleted file mode 100644
index eb068e6..0000000
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ /dev/null
@@ -1,655 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowClient;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/** Tests for {@link TestDataflowRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowRunnerTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
- @Mock private DataflowClient mockClient;
-
- private TestDataflowPipelineOptions options;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
-
- options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
- options.setAppName("TestAppName");
- options.setProject("test-project");
- options.setTempLocation("gs://test/temp/location");
- options.setTempRoot("gs://test");
- options.setGcpCredential(new TestCredential());
- options.setRunner(TestDataflowRunner.class);
- options.setPathValidatorClass(NoopPathValidator.class);
- }
-
- @Test
- public void testToString() {
- assertEquals("TestDataflowRunner#TestAppName",
- TestDataflowRunner.fromOptions(options).toString());
- }
-
- @Test
- public void testRunBatchJobThatSucceeds() throws Exception {
- Pipeline p = Pipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
- assertEquals(mockJob, runner.run(p, mockRunner));
- }
-
- @Test
- public void testRunBatchJobThatFails() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, false /*
tentative */));
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out
here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- @Test
- public void testBatchPipelineFailsIfException() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, true /*
tentative */));
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- assertThat(expected.getMessage(), containsString("FooException"));
- verify(mockJob, never()).cancel();
- return;
- }
- // Note that fail throws an AssertionError which is why it is placed out
here
- // instead of inside the try-catch block.
- fail("AssertionError expected");
- }
-
- /**
- * A streaming job that terminates with no error messages is a success.
- */
- @Test
- public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws
Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- p.apply(Create.of(1, 2, 3));
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String,
BigDecimal>of()));
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- runner.run(p, mockRunner);
- }
-
- /**
- * Tests that a streaming job with a false {@link PAssert} fails.
- *
- * <p>Currently, this failure is indistinguishable from a non-{@link
PAssert} failure, because it
- * is detected only by failure job messages. With fuller metric support,
this can detect a PAssert
- * failure via metrics and raise an {@link AssertionError} in just that case.
- */
- @Test
- public void testRunStreamingJobThatFails() throws Exception {
- testStreamingPipelineFailsIfException();
- }
-
- private JobMetrics generateMockMetricResponse(boolean success, boolean
tentative)
- throws Exception {
- List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
- return buildJobMetrics(metrics);
- }
-
- private List<MetricUpdate> generateMockMetrics(boolean success, boolean
tentative) {
- MetricStructuredName name = new MetricStructuredName();
- name.setName(success ? "PAssertSuccess" : "PAssertFailure");
- name.setContext(
- tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String,
String>of());
-
- MetricUpdate metric = new MetricUpdate();
- metric.setName(name);
- metric.setScalar(BigDecimal.ONE);
- return Lists.newArrayList(metric);
- }
-
- private JobMetrics generateMockStreamingMetricResponse(Map<String,
- BigDecimal> metricMap) throws IOException {
- return buildJobMetrics(generateMockStreamingMetrics(metricMap));
- }
-
- private List<MetricUpdate> generateMockStreamingMetrics(Map<String,
BigDecimal> metricMap) {
- List<MetricUpdate> metrics = Lists.newArrayList();
- for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
- MetricStructuredName name = new MetricStructuredName();
- name.setName(entry.getKey());
-
- MetricUpdate metric = new MetricUpdate();
- metric.setName(name);
- metric.setScalar(entry.getValue());
- metrics.add(metric);
- }
- return metrics;
- }
-
- private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
- JobMetrics jobMetrics = new JobMetrics();
- jobMetrics.setMetrics(metricList);
- // N.B. Setting the factory is necessary in order to get valid JSON.
- jobMetrics.setFactory(Transport.getJsonFactory());
- return jobMetrics;
- }
-
- /**
- * Tests that a tentative {@code true} from metrics indicates that every
{@link PAssert} has
- * succeeded.
- */
- @Test
- public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */,
true /* tentative */)));
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- doReturn(State.DONE).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
- }
-
- /**
- * Tests that when we just see a tentative failure for a {@link PAssert} it
is considered a
- * conclusive failure.
- */
- @Test
- public void testCheckingForSuccessWhenPAssertFails() throws Exception {
- DataflowPipelineJob job =
- spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(
- buildJobMetrics(generateMockMetrics(false /* success */, true /*
tentative */)));
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- doReturn(State.DONE).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job),
equalTo(Optional.of(false)));
- }
-
- @Test
- public void testCheckingForSuccessSkipsNonTentativeMetrics() throws
Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient,
"test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(
- buildJobMetrics(generateMockMetrics(true /* success */, false /*
tentative */)));
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- runner.updatePAssertCount(p);
- doReturn(State.RUNNING).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job),
equalTo(Optional.<Boolean>absent()));
- }
-
- /**
- * Tests that if a streaming pipeline terminates with FAIL that the check
for PAssert
- * success is a conclusive failure.
- */
- @Test
- public void testStreamingPipelineFailsIfServiceFails() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient,
"test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- doReturn(State.FAILED).when(job).getState();
- assertThat(runner.checkForPAssertSuccess(job),
equalTo(Optional.of(false)));
- }
-
- /**
- * Tests that if a streaming pipeline crash loops for a non-assertion reason
that the test run
- * throws an {@link AssertionError}.
- *
- * <p>This is a known limitation/bug of the runner that it does not
distinguish the two modes of
- * failure.
- */
- @Test
- public void testStreamingPipelineFailsIfException() throws Exception {
- options.setStreaming(true);
- Pipeline pipeline = TestPipeline.create(options);
- PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.RUNNING);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
- when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
- .thenAnswer(new Answer<State>() {
- @Override
- public State answer(InvocationOnMock invocation) {
- JobMessage message = new JobMessage();
- message.setMessageText("FooException");
- message.setTime(TimeUtil.toCloudTime(Instant.now()));
- message.setMessageImportance("JOB_MESSAGE_ERROR");
- ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
- .process(Arrays.asList(message));
- return State.CANCELLED;
- }
- });
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, true /*
tentative */));
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-
- try {
- runner.run(pipeline, mockRunner);
- } catch (AssertionError exc) {
- return;
- }
- fail("AssertionError expected");
- }
-
- @Test
- public void testGetJobMetricsThatSucceeds() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient,
"test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- p.apply(Create.of(1, 2, 3));
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- JobMetrics metrics = runner.getJobMetrics(job);
-
- assertEquals(1, metrics.getMetrics().size());
- assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
- metrics.getMetrics());
- }
-
- @Test
- public void testGetJobMetricsThatFailsForException() throws Exception {
- DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient,
"test-job", options, null));
- Pipeline p = TestPipeline.create(options);
- p.apply(Create.of(1, 2, 3));
-
- when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- assertNull(runner.getJobMetrics(job));
- }
-
- @Test
- public void testBatchOnCreateMatcher() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnCreateMatcher(new
TestSuccessMatcher(mockJob, 0));
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testStreamingOnCreateMatcher() throws Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnCreateMatcher(new
TestSuccessMatcher(mockJob, 0));
-
- when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */
- ));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception
{
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new
TestSuccessMatcher(mockJob, 1));
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
- runner.run(p, mockRunner);
- }
-
- /**
- * Tests that when a streaming pipeline terminates and doesn't fail due to
{@link PAssert} that
- * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher)
on success matcher} is
- * invoked.
- */
- @Test
- public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws
Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.DONE);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new
TestSuccessMatcher(mockJob, 1));
-
- when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
- .thenReturn(State.DONE);
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(true /* success */, true /*
tentative */));
- runner.run(p, mockRunner);
- }
-
- @Test
- public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new
TestFailureMatcher());
-
- when(mockClient.getJobMetrics(anyString()))
- .thenReturn(generateMockMetricResponse(false /* success */, true /*
tentative */));
- try {
- runner.run(p, mockRunner);
- } catch (AssertionError expected) {
- verify(mockJob, Mockito.times(1)).waitUntilFinish(
- any(Duration.class), any(JobMessagesHandler.class));
- return;
- }
- fail("Expected an exception on pipeline failure.");
- }
-
- /**
- * Tests that when a streaming pipeline terminates in FAIL that the {@link
- * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success
matcher} is not
- * invoked.
- */
- @Test
- public void testStreamingOnSuccessMatcherWhenPipelineFails() throws
Exception {
- options.setStreaming(true);
- Pipeline p = TestPipeline.create(options);
- PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
- PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
- final DataflowPipelineJob mockJob =
Mockito.mock(DataflowPipelineJob.class);
- when(mockJob.getState()).thenReturn(State.FAILED);
- when(mockJob.getProjectId()).thenReturn("test-project");
- when(mockJob.getJobId()).thenReturn("test-job");
-
- DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
- when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
- TestDataflowRunner runner =
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- options.as(TestPipelineOptions.class).setOnSuccessMatcher(new
TestFailureMatcher());
-
- when(mockJob.waitUntilFinish(any(Duration.class),
any(JobMessagesHandler.class)))
- .thenReturn(State.FAILED);
-
- runner.run(p, mockRunner);
- // If the onSuccessMatcher were invoked, it would have crashed here.
- }
-
- static class TestSuccessMatcher extends BaseMatcher<PipelineResult>
implements
- SerializableMatcher<PipelineResult> {
- private final DataflowPipelineJob mockJob;
- private final int called;
-
- public TestSuccessMatcher(DataflowPipelineJob job, int times) {
- this.mockJob = job;
- this.called = times;
- }
-
- @Override
- public boolean matches(Object o) {
- if (!(o instanceof PipelineResult)) {
- fail(String.format("Expected PipelineResult but received %s", o));
- }
- try {
- verify(mockJob, Mockito.times(called)).waitUntilFinish(
- any(Duration.class), any(JobMessagesHandler.class));
- } catch (IOException | InterruptedException e) {
- throw new AssertionError(e);
- }
- assertSame(mockJob, o);
- return true;
- }
-
- @Override
- public void describeTo(Description description) {
- }
- }
-
- static class TestFailureMatcher extends BaseMatcher<PipelineResult>
implements
- SerializableMatcher<PipelineResult> {
- @Override
- public boolean matches(Object o) {
- fail("OnSuccessMatcher should not be called on pipeline failure.");
- return false;
- }
-
- @Override
- public void describeTo(Description description) {
- }
- }
-}