http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java new file mode 100644 index 0000000..05297ec --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import org.apache.beam.sdk.annotations.Experimental; + +import com.google.api.services.dataflow.model.Environment; + +/** + * An instance of this class can be passed to the + * {@link DataflowRunner} to add user defined hooks to be + * invoked at various times during pipeline execution. + */ +@Experimental +public class DataflowRunnerHooks { + /** + * Allows the user to modify the environment of their job before their job is submitted + * to the service for execution. + * + * @param environment The environment of the job. Users can make change to this instance in order + * to change the environment with which their job executes on the service. + */ + public void modifyEnvironmentBeforeSubmission(Environment environment) {} +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java index f2e8459..ea83bfb 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java @@ -16,6 +16,6 @@ * limitations under the License. */ /** - * Implementation of the {@link org.apache.beam.runners.dataflow.DataflowPipelineRunner}. + * Implementation of the {@link org.apache.beam.runners.dataflow.DataflowRunner}. */ package org.apache.beam.runners.dataflow.internal; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java index 7fa5ad6..809df35 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.options; -import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner; +import org.apache.beam.runners.dataflow.BlockingDataflowRunner; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; @@ -29,9 +29,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import java.io.PrintStream; /** - * Options that are used to configure the {@link BlockingDataflowPipelineRunner}. + * Options that are used to configure the {@link BlockingDataflowRunner}. */ -@Description("Configure options on the BlockingDataflowPipelineRunner.") +@Description("Configure options on the BlockingDataflowRunner.") public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions { /** * Output stream for job status messages. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 6e6ad96..f665a08 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.options; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.Default; @@ -38,7 +38,7 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; /** - * Options that can be used to configure the {@link DataflowPipelineRunner}. + * Options that can be used to configure the {@link DataflowRunner}. */ @Description("Options that configure the Dataflow pipeline.") public interface DataflowPipelineOptions http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index a29b328..e3a1a0f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.options; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -131,9 +131,9 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions { public String create(PipelineOptions options) { DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); if (dataflowOptions.isStreaming()) { - return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE; + return DataflowRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE; } else { - return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE; + return DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE; } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java index c940e9a..f83a139 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java @@ -21,7 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import org.apache.beam.runners.dataflow.DataflowJobExecutionException; import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; import org.apache.beam.sdk.Pipeline; @@ -54,7 +54,7 @@ import java.util.concurrent.TimeUnit; /** * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a - * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}. + * {@link DataflowRunner} when running tests against the {@link TestPipeline}. * * @see TestPipeline */ @@ -63,12 +63,12 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class); private final TestDataflowPipelineOptions options; - private final DataflowPipelineRunner runner; + private final DataflowRunner runner; private int expectedNumberOfAssertions = 0; TestDataflowPipelineRunner(TestDataflowPipelineOptions options) { this.options = options; - this.runner = DataflowPipelineRunner.fromOptions(options); + this.runner = DataflowRunner.fromOptions(options); } /** @@ -89,7 +89,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ return run(pipeline, runner); } - DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) { + DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); final DataflowPipelineJob job; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java deleted file mode 100644 index 55b4027..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.util.NoopPathValidator; -import org.apache.beam.sdk.util.TestCredential; - -import org.hamcrest.Description; -import org.hamcrest.Factory; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.concurrent.TimeUnit; - -/** - * Tests for BlockingDataflowPipelineRunner. - */ -@RunWith(JUnit4.class) -public class BlockingDataflowPipelineRunnerTest { - - @Rule - public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class); - - @Rule - public ExpectedException expectedThrown = ExpectedException.none(); - - /** - * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher} - * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}. - */ - private static class DataflowJobExceptionMatcher<T extends DataflowJobException> - extends TypeSafeMatcher<T> { - - private final Matcher<DataflowPipelineJob> matcher; - - public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) { - this.matcher = matcher; - } - - @Override - public boolean matchesSafely(T ex) { - return matcher.matches(ex.getJob()); - } - - @Override - protected void describeMismatchSafely(T item, Description description) { - description.appendText("job "); - matcher.describeMismatch(item.getMessage(), description); - } - - @Override - public void describeTo(Description description) { - description.appendText("exception with job matching "); - description.appendDescriptionOf(matcher); - } - - @Factory - public static <T extends DataflowJobException> Matcher<T> expectJob( - Matcher<DataflowPipelineJob> matcher) { - return new DataflowJobExceptionMatcher<T>(matcher); - } - } - - /** - * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher} - * to the return value of {@link DataflowPipelineJob#getJobId()}. - */ - private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> { - - private final Matcher<String> matcher; - - public JobIdMatcher(Matcher<String> matcher) { - this.matcher = matcher; - } - - @Override - public boolean matchesSafely(T job) { - return matcher.matches(job.getJobId()); - } - - @Override - protected void describeMismatchSafely(T item, Description description) { - description.appendText("jobId "); - matcher.describeMismatch(item.getJobId(), description); - } - - @Override - public void describeTo(Description description) { - description.appendText("job with jobId "); - description.appendDescriptionOf(matcher); - } - - @Factory - public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) { - return new JobIdMatcher<T>(equalTo(jobId)); - } - } - - /** - * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying - * {@link Matcher} to the {@link DataflowPipelineJob} returned by - * {@link DataflowJobUpdatedException#getReplacedByJob()}. - */ - private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException> - extends TypeSafeMatcher<T> { - - private final Matcher<DataflowPipelineJob> matcher; - - public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) { - this.matcher = matcher; - } - - @Override - public boolean matchesSafely(T ex) { - return matcher.matches(ex.getReplacedByJob()); - } - - @Override - protected void describeMismatchSafely(T item, Description description) { - description.appendText("job "); - matcher.describeMismatch(item.getMessage(), description); - } - - @Override - public void describeTo(Description description) { - description.appendText("exception with replacedByJob() "); - description.appendDescriptionOf(matcher); - } - - @Factory - public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy( - Matcher<DataflowPipelineJob> matcher) { - return new ReplacedByJobMatcher<T>(matcher); - } - } - - /** - * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId} - * that will immediately terminate in the provided {@code terminalState}. - * - * <p>The return value may be further mocked. - */ - private DataflowPipelineJob createMockJob( - String projectId, String jobId, State terminalState) throws Exception { - DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class); - when(mockJob.getProjectId()).thenReturn(projectId); - when(mockJob.getJobId()).thenReturn(jobId); - when(mockJob.waitToFinish( - anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class))) - .thenReturn(terminalState); - return mockJob; - } - - /** - * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job to return. - * Some {@link PipelineOptions} will be extracted from the job, such as the project ID. - */ - private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job) - throws Exception { - DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class); - TestDataflowPipelineOptions options = - PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); - options.setRunner(BlockingDataflowPipelineRunner.class); - options.setProject(job.getProjectId()); - - when(mockRunner.run(isA(Pipeline.class))).thenReturn(job); - - return new BlockingDataflowPipelineRunner(mockRunner, options); - } - - /** - * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates in - * the {@link State#DONE DONE} state. - */ - @Test - public void testJobDoneComplete() throws Exception { - createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE)) - .run(TestPipeline.create()); - expectedLogs.verifyInfo("Job finished with status DONE"); - } - - /** - * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception - * when a job terminates in the {@link State#FAILED FAILED} state. - */ - @Test - public void testFailedJobThrowsException() throws Exception { - expectedThrown.expect(DataflowJobExecutionException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testFailedJob-jobId"))); - createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED)) - .run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception - * when a job terminates in the {@link State#CANCELLED CANCELLED} state. - */ - @Test - public void testCancelledJobThrowsException() throws Exception { - expectedThrown.expect(DataflowJobCancelledException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testCancelledJob-jobId"))); - createMockRunner( - createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) - .run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception - * when a job terminates in the {@link State#UPDATED UPDATED} state. - */ - @Test - public void testUpdatedJobThrowsException() throws Exception { - expectedThrown.expect(DataflowJobUpdatedException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testUpdatedJob-jobId"))); - expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy( - JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId"))); - DataflowPipelineJob job = - createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED); - DataflowPipelineJob replacedByJob = - createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE); - when(job.getReplacedByJob()).thenReturn(replacedByJob); - createMockRunner(job).run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception - * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the - * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it - * is an old SDK relative the service). - */ - @Test - public void testUnknownJobThrowsException() throws Exception { - expectedThrown.expect(IllegalStateException.class); - createMockRunner( - createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) - .run(TestPipeline.create()); - } - - /** - * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception - * when a job returns a {@code null} state, indicating that it failed to contact the service, - * including all of its built-in resilience logic. - */ - @Test - public void testNullJobThrowsException() throws Exception { - expectedThrown.expect(DataflowServiceException.class); - expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( - JobIdMatcher.expectJobId("testNullJob-jobId"))); - createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) - .run(TestPipeline.create()); - } - - @Test - public void testToString() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setJobName("TestJobName"); - options.setProject("test-project"); - options.setTempLocation("gs://test/temp/location"); - options.setGcpCredential(new TestCredential()); - options.setPathValidatorClass(NoopPathValidator.class); - options.setRunner(BlockingDataflowPipelineRunner.class); - assertEquals("BlockingDataflowPipelineRunner#testjobname", - BlockingDataflowPipelineRunner.fromOptions(options).toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java new file mode 100644 index 0000000..7be074e --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.NoopPathValidator; +import org.apache.beam.sdk.util.TestCredential; + +import org.hamcrest.Description; +import org.hamcrest.Factory; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.TimeUnit; + +/** + * Tests for BlockingDataflowRunner. + */ +@RunWith(JUnit4.class) +public class BlockingDataflowRunnerTest { + + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowRunner.class); + + @Rule + public ExpectedException expectedThrown = ExpectedException.none(); + + /** + * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link Matcher} + * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}. + */ + private static class DataflowJobExceptionMatcher<T extends DataflowJobException> + extends TypeSafeMatcher<T> { + + private final Matcher<DataflowPipelineJob> matcher; + + public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) { + this.matcher = matcher; + } + + @Override + public boolean matchesSafely(T ex) { + return matcher.matches(ex.getJob()); + } + + @Override + protected void describeMismatchSafely(T item, Description description) { + description.appendText("job "); + matcher.describeMismatch(item.getMessage(), description); + } + + @Override + public void describeTo(Description description) { + description.appendText("exception with job matching "); + description.appendDescriptionOf(matcher); + } + + @Factory + public static <T extends DataflowJobException> Matcher<T> expectJob( + Matcher<DataflowPipelineJob> matcher) { + return new DataflowJobExceptionMatcher<T>(matcher); + } + } + + /** + * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link Matcher} + * to the return value of {@link DataflowPipelineJob#getJobId()}. + */ + private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T> { + + private final Matcher<String> matcher; + + public JobIdMatcher(Matcher<String> matcher) { + this.matcher = matcher; + } + + @Override + public boolean matchesSafely(T job) { + return matcher.matches(job.getJobId()); + } + + @Override + protected void describeMismatchSafely(T item, Description description) { + description.appendText("jobId "); + matcher.describeMismatch(item.getJobId(), description); + } + + @Override + public void describeTo(Description description) { + description.appendText("job with jobId "); + description.appendDescriptionOf(matcher); + } + + @Factory + public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final String jobId) { + return new JobIdMatcher<T>(equalTo(jobId)); + } + } + + /** + * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying + * {@link Matcher} to the {@link DataflowPipelineJob} returned by + * {@link DataflowJobUpdatedException#getReplacedByJob()}. + */ + private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException> + extends TypeSafeMatcher<T> { + + private final Matcher<DataflowPipelineJob> matcher; + + public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) { + this.matcher = matcher; + } + + @Override + public boolean matchesSafely(T ex) { + return matcher.matches(ex.getReplacedByJob()); + } + + @Override + protected void describeMismatchSafely(T item, Description description) { + description.appendText("job "); + matcher.describeMismatch(item.getMessage(), description); + } + + @Override + public void describeTo(Description description) { + description.appendText("exception with replacedByJob() "); + description.appendDescriptionOf(matcher); + } + + @Factory + public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy( + Matcher<DataflowPipelineJob> matcher) { + return new ReplacedByJobMatcher<T>(matcher); + } + } + + /** + * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code jobId} + * that will immediately terminate in the provided {@code terminalState}. + * + * <p>The return value may be further mocked. + */ + private DataflowPipelineJob createMockJob( + String projectId, String jobId, State terminalState) throws Exception { + DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class); + when(mockJob.getProjectId()).thenReturn(projectId); + when(mockJob.getJobId()).thenReturn(jobId); + when(mockJob.waitToFinish( + anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class))) + .thenReturn(terminalState); + return mockJob; + } + + /** + * Returns a {@link BlockingDataflowRunner} that will return the provided a job to return. + * Some {@link PipelineOptions} will be extracted from the job, such as the project ID. + */ + private BlockingDataflowRunner createMockRunner(DataflowPipelineJob job) + throws Exception { + DataflowRunner mockRunner = mock(DataflowRunner.class); + TestDataflowPipelineOptions options = + PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setRunner(BlockingDataflowRunner.class); + options.setProject(job.getProjectId()); + + when(mockRunner.run(isA(Pipeline.class))).thenReturn(job); + + return new BlockingDataflowRunner(mockRunner, options); + } + + /** + * Tests that the {@link BlockingDataflowRunner} returns normally when a job terminates in + * the {@link State#DONE DONE} state. + */ + @Test + public void testJobDoneComplete() throws Exception { + createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE)) + .run(TestPipeline.create()); + expectedLogs.verifyInfo("Job finished with status DONE"); + } + + /** + * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception + * when a job terminates in the {@link State#FAILED FAILED} state. + */ + @Test + public void testFailedJobThrowsException() throws Exception { + expectedThrown.expect(DataflowJobExecutionException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testFailedJob-jobId"))); + createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED)) + .run(TestPipeline.create()); + } + + /** + * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception + * when a job terminates in the {@link State#CANCELLED CANCELLED} state. + */ + @Test + public void testCancelledJobThrowsException() throws Exception { + expectedThrown.expect(DataflowJobCancelledException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testCancelledJob-jobId"))); + createMockRunner( + createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED)) + .run(TestPipeline.create()); + } + + /** + * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception + * when a job terminates in the {@link State#UPDATED UPDATED} state. + */ + @Test + public void testUpdatedJobThrowsException() throws Exception { + expectedThrown.expect(DataflowJobUpdatedException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testUpdatedJob-jobId"))); + expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy( + JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId"))); + DataflowPipelineJob job = + createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED); + DataflowPipelineJob replacedByJob = + createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE); + when(job.getReplacedByJob()).thenReturn(replacedByJob); + createMockRunner(job).run(TestPipeline.create()); + } + + /** + * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception + * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the + * Dataflow service returned a state that the SDK is unfamiliar with (possibly because it + * is an old SDK relative the service). + */ + @Test + public void testUnknownJobThrowsException() throws Exception { + expectedThrown.expect(IllegalStateException.class); + createMockRunner( + createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN)) + .run(TestPipeline.create()); + } + + /** + * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception + * when a job returns a {@code null} state, indicating that it failed to contact the service, + * including all of its built-in resilience logic. + */ + @Test + public void testNullJobThrowsException() throws Exception { + expectedThrown.expect(DataflowServiceException.class); + expectedThrown.expect(DataflowJobExceptionMatcher.expectJob( + JobIdMatcher.expectJobId("testNullJob-jobId"))); + createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null)) + .run(TestPipeline.create()); + } + + @Test + public void testToString() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + options.setProject("test-project"); + options.setTempLocation("gs://test/temp/location"); + options.setGcpCredential(new TestCredential()); + options.setPathValidatorClass(NoopPathValidator.class); + options.setRunner(BlockingDataflowRunner.class); + assertEquals("BlockingDataflowRunner#testjobname", + BlockingDataflowRunner.fromOptions(options).toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java index cf9a95a..388a85a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java @@ -46,8 +46,8 @@ public class DataflowPipelineRegistrarTest { @Test public void testCorrectRunnersAreReturned() { - assertEquals(ImmutableList.of(DataflowPipelineRunner.class, - BlockingDataflowPipelineRunner.class), + assertEquals(ImmutableList.of(DataflowRunner.class, + BlockingDataflowRunner.class), new DataflowPipelineRegistrar.Runner().getPipelineRunners()); }
