Repository: incubator-beam Updated Branches: refs/heads/master 81dcd5278 -> 8a9dcc71f
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java deleted file mode 100644 index fbaf116..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java +++ /dev/null @@ -1,601 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.testing; - -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; -import org.apache.beam.runners.dataflow.util.TimeUtil; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.SerializableMatcher; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.TestCredential; -import org.apache.beam.sdk.util.Transport; -import org.apache.beam.sdk.values.PCollection; - -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.Json; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.model.JobMessage; -import com.google.api.services.dataflow.model.JobMetrics; -import com.google.api.services.dataflow.model.MetricStructuredName; -import com.google.api.services.dataflow.model.MetricUpdate; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -/** Tests for {@link TestDataflowPipelineRunner}. */ -@RunWith(JUnit4.class) -public class TestDataflowPipelineRunnerTest { - @Rule public ExpectedException expectedException = ExpectedException.none(); - @Mock private MockHttpTransport transport; - @Mock private MockLowLevelHttpRequest request; - @Mock private GcsUtil mockGcsUtil; - - private TestDataflowPipelineOptions options; - private Dataflow service; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - when(transport.buildRequest(anyString(), anyString())).thenReturn(request); - doCallRealMethod().when(request).getContentAsString(); - service = new Dataflow(transport, Transport.getJsonFactory(), null); - - options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); - options.setAppName("TestAppName"); - options.setProject("test-project"); - options.setTempLocation("gs://test/temp/location"); - options.setTempRoot("gs://test"); - options.setGcpCredential(new TestCredential()); - options.setDataflowClient(service); - options.setRunner(TestDataflowPipelineRunner.class); - options.setPathValidatorClass(NoopPathValidator.class); - } - - @Test - public void testToString() { - assertEquals("TestDataflowPipelineRunner#TestAppName", - new TestDataflowPipelineRunner(options).toString()); - } - - @Test - public void testRunBatchJobThatSucceeds() throws Exception { - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.DONE); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); - assertEquals(mockJob, runner.run(p, mockRunner)); - } - - @Test - public void testRunBatchJobThatFails() throws Exception { - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.FAILED); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - return; - } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. - fail("AssertionError expected"); - } - - @Test - public void testBatchPipelineFailsIfException() throws Exception { - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.RUNNING); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) - .thenAnswer(new Answer<State>() { - @Override - public State answer(InvocationOnMock invocation) { - JobMessage message = new JobMessage(); - message.setMessageText("FooException"); - message.setTime(TimeUtil.toCloudTime(Instant.now())); - message.setMessageImportance("JOB_MESSAGE_ERROR"); - ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2]) - .process(Arrays.asList(message)); - return State.CANCELLED; - } - }); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - assertThat(expected.getMessage(), containsString("FooException")); - verify(mockJob, atLeastOnce()).cancel(); - return; - } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. - fail("AssertionError expected"); - } - - @Test - public void testRunStreamingJobThatSucceeds() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.RUNNING); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - runner.run(p, mockRunner); - } - - @Test - public void testRunStreamingJobThatFails() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.RUNNING); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - return; - } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. - fail("AssertionError expected"); - } - - @Test - public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", service, null)); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); - doReturn(State.DONE).when(job).getState(); - assertEquals(Optional.of(true), runner.checkForSuccess(job)); - } - - @Test - public void testCheckingForSuccessWhenPAssertFails() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", service, null)); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); - doReturn(State.DONE).when(job).getState(); - assertEquals(Optional.of(false), runner.checkForSuccess(job)); - } - - @Test - public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", service, null)); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, false /* tentative */)); - doReturn(State.RUNNING).when(job).getState(); - assertEquals(Optional.absent(), runner.checkForSuccess(job)); - } - - private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative) - throws Exception { - MetricStructuredName name = new MetricStructuredName(); - name.setName(success ? "PAssertSuccess" : "PAssertFailure"); - name.setContext( - tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of()); - - MetricUpdate metric = new MetricUpdate(); - metric.setName(name); - metric.setScalar(BigDecimal.ONE); - - MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); - response.setContentType(Json.MEDIA_TYPE); - JobMetrics jobMetrics = new JobMetrics(); - jobMetrics.setMetrics(Lists.newArrayList(metric)); - // N.B. Setting the factory is necessary in order to get valid JSON. - jobMetrics.setFactory(Transport.getJsonFactory()); - response.setContent(jobMetrics.toPrettyString()); - return response; - } - - @Test - public void testStreamingPipelineFailsIfServiceFails() throws Exception { - DataflowPipelineJob job = - spy(new DataflowPipelineJob("test-project", "test-job", service, null)); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, false /* tentative */)); - doReturn(State.FAILED).when(job).getState(); - assertEquals(Optional.of(false), runner.checkForSuccess(job)); - } - - @Test - public void testStreamingPipelineFailsIfException() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.RUNNING); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) - .thenAnswer(new Answer<State>() { - @Override - public State answer(InvocationOnMock invocation) { - JobMessage message = new JobMessage(); - message.setMessageText("FooException"); - message.setTime(TimeUtil.toCloudTime(Instant.now())); - message.setMessageImportance("JOB_MESSAGE_ERROR"); - ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2]) - .process(Arrays.asList(message)); - return State.CANCELLED; - } - }); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - assertThat(expected.getMessage(), containsString("FooException")); - verify(mockJob, atLeastOnce()).cancel(); - return; - } - // Note that fail throws an AssertionError which is why it is placed out here - // instead of inside the try-catch block. - fail("AssertionError expected"); - } - - @Test - public void testBatchOnCreateMatcher() throws Exception { - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.DONE); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - p.getOptions().as(TestPipelineOptions.class) - .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); - - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); - runner.run(p, mockRunner); - } - - @Test - public void testStreamingOnCreateMatcher() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.DONE); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - p.getOptions().as(TestPipelineOptions.class) - .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); - - when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) - .thenReturn(State.DONE); - - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); - runner.run(p, mockRunner); - } - - @Test - public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception { - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.DONE); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); - - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); - runner.run(p, mockRunner); - } - - @Test - public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.DONE); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); - - when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) - .thenReturn(State.DONE); - - when(request.execute()).thenReturn( - generateMockMetricResponse(true /* success */, true /* tentative */)); - runner.run(p, mockRunner); - } - - @Test - public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception { - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.FAILED); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestFailureMatcher()); - - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class), - any(JobMessagesHandler.class)); - return; - } - fail("Expected an exception on pipeline failure."); - } - - @Test - public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { - options.setStreaming(true); - Pipeline p = TestPipeline.create(options); - PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); - PAssert.that(pc).containsInAnyOrder(1, 2, 3); - - final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); - when(mockJob.getState()).thenReturn(State.FAILED); - when(mockJob.getProjectId()).thenReturn("test-project"); - when(mockJob.getJobId()).thenReturn("test-job"); - - DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); - when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); - - TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); - p.getOptions().as(TestPipelineOptions.class) - .setOnSuccessMatcher(new TestFailureMatcher()); - - when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) - .thenReturn(State.FAILED); - - when(request.execute()).thenReturn( - generateMockMetricResponse(false /* success */, true /* tentative */)); - try { - runner.run(p, mockRunner); - } catch (AssertionError expected) { - verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class), - any(JobMessagesHandler.class)); - return; - } - fail("Expected an exception on pipeline failure."); - } - - static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements - SerializableMatcher<PipelineResult> { - private final DataflowPipelineJob mockJob; - private final int called; - - public TestSuccessMatcher(DataflowPipelineJob job, int times) { - this.mockJob = job; - this.called = times; - } - - @Override - public boolean matches(Object o) { - if (!(o instanceof PipelineResult)) { - fail(String.format("Expected PipelineResult but received %s", o)); - } - try { - verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class), - any(JobMessagesHandler.class)); - } catch (IOException | InterruptedException e) { - throw new AssertionError(e); - } - assertSame(mockJob, o); - return true; - } - - @Override - public void describeTo(Description description) { - } - } - - static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements - SerializableMatcher<PipelineResult> { - @Override - public boolean matches(Object o) { - fail("OnSuccessMatcher should not be called on pipeline failure."); - return false; - } - - @Override - public void describeTo(Description description) { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java new file mode 100644 index 0000000..4067f08 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.testing; + +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.dataflow.DataflowPipelineJob; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; +import org.apache.beam.runners.dataflow.util.TimeUtil; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SerializableMatcher; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.NoopPathValidator; +import org.apache.beam.sdk.util.TestCredential; +import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.values.PCollection; + +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.Json; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricStructuredName; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +/** Tests for {@link TestDataflowPipelineRunner}. */ +@RunWith(JUnit4.class) +public class TestDataflowRunnerTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + @Mock private MockHttpTransport transport; + @Mock private MockLowLevelHttpRequest request; + @Mock private GcsUtil mockGcsUtil; + + private TestDataflowPipelineOptions options; + private Dataflow service; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + when(transport.buildRequest(anyString(), anyString())).thenReturn(request); + doCallRealMethod().when(request).getContentAsString(); + service = new Dataflow(transport, Transport.getJsonFactory(), null); + + options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setAppName("TestAppName"); + options.setProject("test-project"); + options.setTempLocation("gs://test/temp/location"); + options.setTempRoot("gs://test"); + options.setGcpCredential(new TestCredential()); + options.setDataflowClient(service); + options.setRunner(TestDataflowPipelineRunner.class); + options.setPathValidatorClass(NoopPathValidator.class); + } + + @Test + public void testToString() { + assertEquals("TestDataflowPipelineRunner#TestAppName", + new TestDataflowPipelineRunner(options).toString()); + } + + @Test + public void testRunBatchJobThatSucceeds() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + assertEquals(mockJob, runner.run(p, mockRunner)); + } + + @Test + public void testRunBatchJobThatFails() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.FAILED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testBatchPipelineFailsIfException() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) + .thenAnswer(new Answer<State>() { + @Override + public State answer(InvocationOnMock invocation) { + JobMessage message = new JobMessage(); + message.setMessageText("FooException"); + message.setTime(TimeUtil.toCloudTime(Instant.now())); + message.setMessageImportance("JOB_MESSAGE_ERROR"); + ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2]) + .process(Arrays.asList(message)); + return State.CANCELLED; + } + }); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + assertThat(expected.getMessage(), containsString("FooException")); + verify(mockJob, atLeastOnce()).cancel(); + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testRunStreamingJobThatSucceeds() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + runner.run(p, mockRunner); + } + + @Test + public void testRunStreamingJobThatFails() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + doReturn(State.DONE).when(job).getState(); + assertEquals(Optional.of(true), runner.checkForSuccess(job)); + } + + @Test + public void testCheckingForSuccessWhenPAssertFails() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + doReturn(State.DONE).when(job).getState(); + assertEquals(Optional.of(false), runner.checkForSuccess(job)); + } + + @Test + public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, false /* tentative */)); + doReturn(State.RUNNING).when(job).getState(); + assertEquals(Optional.absent(), runner.checkForSuccess(job)); + } + + private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative) + throws Exception { + MetricStructuredName name = new MetricStructuredName(); + name.setName(success ? "PAssertSuccess" : "PAssertFailure"); + name.setContext( + tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of()); + + MetricUpdate metric = new MetricUpdate(); + metric.setName(name); + metric.setScalar(BigDecimal.ONE); + + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setContentType(Json.MEDIA_TYPE); + JobMetrics jobMetrics = new JobMetrics(); + jobMetrics.setMetrics(Lists.newArrayList(metric)); + // N.B. Setting the factory is necessary in order to get valid JSON. + jobMetrics.setFactory(Transport.getJsonFactory()); + response.setContent(jobMetrics.toPrettyString()); + return response; + } + + @Test + public void testStreamingPipelineFailsIfServiceFails() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, false /* tentative */)); + doReturn(State.FAILED).when(job).getState(); + assertEquals(Optional.of(false), runner.checkForSuccess(job)); + } + + @Test + public void testStreamingPipelineFailsIfException() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) + .thenAnswer(new Answer<State>() { + @Override + public State answer(InvocationOnMock invocation) { + JobMessage message = new JobMessage(); + message.setMessageText("FooException"); + message.setTime(TimeUtil.toCloudTime(Instant.now())); + message.setMessageImportance("JOB_MESSAGE_ERROR"); + ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2]) + .process(Arrays.asList(message)); + return State.CANCELLED; + } + }); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + assertThat(expected.getMessage(), containsString("FooException")); + verify(mockJob, atLeastOnce()).cancel(); + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testBatchOnCreateMatcher() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + p.getOptions().as(TestPipelineOptions.class) + .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); + + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + runner.run(p, mockRunner); + } + + @Test + public void testStreamingOnCreateMatcher() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + p.getOptions().as(TestPipelineOptions.class) + .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0)); + + when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); + + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + runner.run(p, mockRunner); + } + + @Test + public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + p.getOptions().as(TestPipelineOptions.class) + .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); + + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + runner.run(p, mockRunner); + } + + @Test + public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + p.getOptions().as(TestPipelineOptions.class) + .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1)); + + when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) + .thenReturn(State.DONE); + + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + runner.run(p, mockRunner); + } + + @Test + public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.FAILED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + p.getOptions().as(TestPipelineOptions.class) + .setOnSuccessMatcher(new TestFailureMatcher()); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class), + any(JobMessagesHandler.class)); + return; + } + fail("Expected an exception on pipeline failure."); + } + + @Test + public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection<Integer> pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.FAILED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + p.getOptions().as(TestPipelineOptions.class) + .setOnSuccessMatcher(new TestFailureMatcher()); + + when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) + .thenReturn(State.FAILED); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + verify(mockJob, Mockito.times(1)).waitToFinish(any(Long.class), any(TimeUnit.class), + any(JobMessagesHandler.class)); + return; + } + fail("Expected an exception on pipeline failure."); + } + + static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements + SerializableMatcher<PipelineResult> { + private final DataflowPipelineJob mockJob; + private final int called; + + public TestSuccessMatcher(DataflowPipelineJob job, int times) { + this.mockJob = job; + this.called = times; + } + + @Override + public boolean matches(Object o) { + if (!(o instanceof PipelineResult)) { + fail(String.format("Expected PipelineResult but received %s", o)); + } + try { + verify(mockJob, Mockito.times(called)).waitToFinish(any(Long.class), any(TimeUnit.class), + any(JobMessagesHandler.class)); + } catch (IOException | InterruptedException e) { + throw new AssertionError(e); + } + assertSame(mockJob, o); + return true; + } + + @Override + public void describeTo(Description description) { + } + } + + static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements + SerializableMatcher<PipelineResult> { + @Override + public boolean matches(Object o) { + fail("OnSuccessMatcher should not be called on pipeline failure."); + return false; + } + + @Override + public void describeTo(Description description) { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java index 0b865c3..d809cc6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.transforms; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.GcpOptions; @@ -30,7 +30,7 @@ import com.google.common.collect.Lists; /** * Factory methods for creating {@link DisplayDataEvaluator} instances against the - * {@link DataflowPipelineRunner}. + * {@link DataflowRunner}. */ public final class DataflowDisplayDataEvaluator { /** Do not instantiate. */ @@ -43,7 +43,7 @@ public final class DataflowDisplayDataEvaluator { public static DataflowPipelineOptions getDefaultOptions() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); options.setProject("foobar"); options.setTempLocation("gs://bucket/tmpLocation"); options.setFilesToStage(Lists.<String>newArrayList()); @@ -56,7 +56,7 @@ public final class DataflowDisplayDataEvaluator { /** * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against - * the {@link DataflowPipelineRunner}. + * the {@link DataflowRunner}. */ public static DisplayDataEvaluator create() { return create(getDefaultOptions()); @@ -64,7 +64,7 @@ public final class DataflowDisplayDataEvaluator { /** * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against - * the {@link DataflowPipelineRunner} with the specified {@code options}. + * the {@link DataflowRunner} with the specified {@code options}. */ public static DisplayDataEvaluator create(DataflowPipelineOptions options) { return DisplayDataEvaluator.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java index f0e677e..a44b8a7 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.transforms; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; @@ -46,20 +46,20 @@ import org.junit.runners.JUnit4; import java.util.Arrays; import java.util.List; -/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */ +/** Tests for {@link GroupByKey} for the {@link DataflowRunner}. */ @RunWith(JUnit4.class) public class DataflowGroupByKeyTest { @Rule public ExpectedException thrown = ExpectedException.none(); /** - * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey} + * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey} * is not expanded. This is used for verifying that even without expansion the proper errors show * up. */ private Pipeline createTestServiceRunner() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); options.setProject("someproject"); options.setStagingLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java index d787500..1b263d2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.transforms; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -44,7 +44,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */ +/** Tests for {@link View} for a {@link DataflowRunner}. */ @RunWith(JUnit4.class) public class DataflowViewTest { @Rule @@ -52,7 +52,7 @@ public class DataflowViewTest { private Pipeline createTestBatchRunner() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); options.setProject("someproject"); options.setStagingLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); @@ -62,7 +62,7 @@ public class DataflowViewTest { private Pipeline createTestStreamingRunner() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); options.setStreaming(true); options.setProject("someproject"); options.setStagingLocation("gs://staging"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java index 5587986..a91f56c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java @@ -21,7 +21,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.when; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.GcsUtil; @@ -52,7 +52,7 @@ public class DataflowPathValidatorTest { when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setGcpCredential(new TestCredential()); - options.setRunner(DataflowPipelineRunner.class); + options.setRunner(DataflowRunner.class); options.setGcsUtil(mockGcsUtil); validator = new DataflowPathValidator(options); } @@ -66,7 +66,7 @@ public class DataflowPathValidatorTest { public void testInvalidFilePattern() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage( - "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'"); + "DataflowRunner expected a valid 'gs://' path but was given '/local/path'"); validator.validateInputFilePatternSupported("/local/path"); } @@ -88,7 +88,7 @@ public class DataflowPathValidatorTest { public void testInvalidOutputPrefix() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage( - "DataflowPipelineRunner expected a valid 'gs://' path but was given '/local/path'"); + "DataflowRunner expected a valid 'gs://' path but was given '/local/path'"); validator.validateOutputFilePrefixSupported("/local/path"); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index 61ad24f..2b4464d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -113,7 +113,7 @@ public class SimpleWordCountTest { String[] words = WORD_BOUNDARY.split(c.element()); // Keep track of the number of lines without any words encountered while tokenizing. - // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner. + // This aggregator is visible in the monitoring UI when run using DataflowRunner. if (words.length == 0) { emptyLines.addValue(1L); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 22a2241..de3c152 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -158,7 +158,7 @@ public class SerializationTest { String[] words = WORD_BOUNDARY.split(c.element().toString()); // Keep track of the number of lines without any words encountered while tokenizing. - // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner. + // This aggregator is visible in the monitoring UI when run using DataflowRunner. if (words.length == 0) { emptyLines.addValue(1L); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java index 479090f..8719384 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/package-info.java @@ -17,16 +17,16 @@ */ /** * Defines runners for executing Pipelines in different modes, including - * {@link org.apache.beam.sdk.runners.DirectPipelineRunner} and - * {@link org.apache.beam.sdk.runners.DataflowPipelineRunner}. + * {@link org.apache.beam.sdk.runners.DirectRunner} and + * {@link org.apache.beam.sdk.runners.DataflowRunner}. * - * <p>{@link org.apache.beam.sdk.runners.DirectPipelineRunner} executes a {@code Pipeline} + * <p>{@link org.apache.beam.sdk.runners.DirectRunner} executes a {@code Pipeline} * locally, without contacting the Dataflow service. - * {@link org.apache.beam.sdk.runners.DataflowPipelineRunner} submits a + * {@link org.apache.beam.sdk.runners.DataflowRunner} submits a * {@code Pipeline} to the Dataflow service, which executes it on Dataflow-managed Compute Engine - * instances. {@code DataflowPipelineRunner} returns + * instances. {@code DataflowRunner} returns * as soon as the {@code Pipeline} has been submitted. Use - * {@link org.apache.beam.sdk.runners.BlockingDataflowPipelineRunner} to have execution + * {@link org.apache.beam.sdk.runners.BlockingDataflowRunner} to have execution * updates printed to the console. * * <p>The runner is specified as part {@link org.apache.beam.sdk.options.PipelineOptions}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 0dba043..b901268 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -60,7 +60,7 @@ import javax.annotation.Nullable; * <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline * options. For example: * <pre>{@code [ - * "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowPipelineRunner", + * "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner", * "--project=mygcpproject", * "--stagingLocation=gs://mygcsbucket/path" * ]}</pre> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index 4c98123..329dec5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; * {@code Aggregator} by calling {@link Aggregator#addValue}. * * <p>Aggregators are visible in the monitoring UI, when the pipeline is run - * using DataflowPipelineRunner or BlockingDataflowPipelineRunner, along with + * using DataflowRunner or BlockingDataflowRunner, along with * their current value. Aggregators may not become visible until the system * begins executing the ParDo transform that created them and/or their initial * value is changed. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java index ad41a3f..3865654 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableRowIterator.java @@ -156,7 +156,7 @@ public class BigQueryTableRowIterator implements AutoCloseable { if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) { // Embed schema information into the raw row, so that values have an // associated key. This matches how rows are read when using the - // DataflowPipelineRunner. + // DataflowRunner. current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next()); return true; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java index fb8bb72..f9ce018 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/PipelineRunnerTest.java @@ -37,7 +37,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** - * Tests for DataflowPipelineRunner. + * Tests for DataflowRunner. */ @RunWith(JUnit4.class) public class PipelineRunnerTest { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index 43c990a..3306cb4 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -70,7 +70,7 @@ import java.util.regex.Pattern; * <pre>{@code * --project=YOUR_PROJECT_ID * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner + * --runner=BlockingDataflowRunner * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} * } * </pre> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java index 3e4fc86..98af2e7 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java @@ -17,7 +17,7 @@ */ package ${package}; -import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; +import org.apache.beam.runners.dataflow.BlockingDataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; @@ -63,7 +63,7 @@ public class MinimalWordCount { // in Google Cloud Storage to stage files. DataflowPipelineOptions options = PipelineOptionsFactory.create() .as(DataflowPipelineOptions.class); - options.setRunner(BlockingDataflowPipelineRunner.class); + options.setRunner(BlockingDataflowRunner.class); // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. options.setProject("SET_YOUR_PROJECT_ID_HERE"); // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java index 7dea9fe..8e56b03 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java @@ -82,7 +82,7 @@ import java.util.List; * <pre>{@code * --project=YOUR_PROJECT_ID * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner + * --runner=BlockingDataflowRunner * } * </pre> * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java index fc1f4b5..07ed6d0 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -77,7 +77,7 @@ import org.apache.beam.sdk.values.PCollection; * <pre>{@code * --project=YOUR_PROJECT_ID * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner + * --runner=BlockingDataflowRunner * } * </pre> * and an output prefix on GCS: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java index 6ec4540..82f0eff 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java @@ -17,9 +17,9 @@ */ package ${package}.common; -import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; +import org.apache.beam.runners.dataflow.BlockingDataflowRunner; import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -251,10 +251,10 @@ public class DataflowExampleUtils { } public void setupRunner() { - if (options.isStreaming() && options.getRunner().equals(BlockingDataflowPipelineRunner.class)) { + if (options.isStreaming() && options.getRunner().equals(BlockingDataflowRunner.class)) { // In order to cancel the pipelines automatically, - // {@literal DataflowPipelineRunner} is forced to be used. - options.setRunner(DataflowPipelineRunner.class); + // {@literal DataflowRunner} is forced to be used. + options.setRunner(DataflowRunner.class); } } @@ -268,7 +268,7 @@ public class DataflowExampleUtils { DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class); copiedOptions.setStreaming(false); copiedOptions.setWorkerHarnessContainerImage( - DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE); + DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE); copiedOptions.setNumWorkers( options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers()); copiedOptions.setJobName(options.getJobName() + "-injector"); @@ -298,7 +298,7 @@ public class DataflowExampleUtils { } /** - * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used, + * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used, * waits for the pipeline to finish and cancels it (and the injector) before the program exists. */ public void waitToFinish(PipelineResult result) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java index 027431f..9a75bb7 100644 --- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; * Platform, you should specify the following command-line options: * --project=<YOUR_PROJECT_ID> * --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE> - * --runner=BlockingDataflowPipelineRunner + * --runner=BlockingDataflowRunner */ public class StarterPipeline { private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java index bb86b0d..8c71d9d 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; * Platform, you should specify the following command-line options: * --project=<YOUR_PROJECT_ID> * --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE> - * --runner=BlockingDataflowPipelineRunner + * --runner=BlockingDataflowRunner */ public class StarterPipeline { private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 09fe7d9..55aea6a 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -37,7 +37,7 @@ <module>core</module> <module>io</module> <!-- sdks/java/maven-archtypes has several dependencies on the - DataflowPipelineRunner. Until these are refactored out or + DataflowRunner. Until these are refactored out or a released artifact exists, we need to modify the build order. <module>maven-archetypes</module> --> <module>extensions</module>