Repository: incubator-beam Updated Branches: refs/heads/master afedd68e8 -> b2b570f27
[BEAM-1047] Update dataflow runner code to use DataflowClient wrapper. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ce03f30c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ce03f30c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ce03f30c Branch: refs/heads/master Commit: ce03f30c1ee0b84ad2e7f10a6272ffb25548244a Parents: e8c9686 Author: Pei He <pe...@google.com> Authored: Mon Nov 28 11:47:42 2016 -0800 Committer: bchambers <bchamb...@google.com> Committed: Tue Dec 6 17:08:12 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowClient.java | 36 +++++++++++++------- .../runners/dataflow/DataflowPipelineJob.java | 23 ++++++------- .../beam/runners/dataflow/DataflowRunner.java | 16 +++------ .../dataflow/testing/TestDataflowRunner.java | 6 ++-- .../runners/dataflow/util/MonitoringUtil.java | 22 +++--------- .../dataflow/DataflowPipelineJobTest.java | 1 + .../transforms/DataflowGroupByKeyTest.java | 14 +++++++- .../dataflow/transforms/DataflowViewTest.java | 16 +++++++-- .../dataflow/util/MonitoringUtilTest.java | 21 +++--------- 9 files changed, 80 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java index f2081db..3536d72 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java @@ -35,27 +35,28 @@ import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; /** - * Client library for {@link Dataflow}. + * Wrapper around the generated {@link Dataflow} client to provide common functionality. */ public class DataflowClient { public static DataflowClient create(DataflowPipelineOptions options) { - return new DataflowClient(options.getDataflowClient(), options); + return new DataflowClient(options.getDataflowClient(), options.getProject()); } private final Dataflow dataflow; - private final DataflowPipelineOptions options; + private final String projectId; - private DataflowClient(Dataflow dataflow, DataflowPipelineOptions options) { + private DataflowClient(Dataflow dataflow, String projectId) { this.dataflow = checkNotNull(dataflow, "dataflow"); - this.options = checkNotNull(options, "options"); + this.projectId = checkNotNull(projectId, "options"); } /** * Creates the Dataflow {@link Job}. */ public Job createJob(@Nonnull Job job) throws IOException { - Jobs.Create jobsCreate = dataflow.projects().jobs().create(options.getProject(), job); + checkNotNull(job, "job"); + Jobs.Create jobsCreate = dataflow.projects().jobs().create(projectId, job); return jobsCreate.execute(); } @@ -65,7 +66,7 @@ public class DataflowClient { */ public ListJobsResponse listJobs(@Nullable String pageToken) throws IOException { Jobs.List jobsList = dataflow.projects().jobs() - .list(options.getProject()) + .list(projectId) .setPageToken(pageToken); return jobsList.execute(); } @@ -74,8 +75,10 @@ public class DataflowClient { * Updates the Dataflow {@link Job} with the given {@code jobId}. */ public Job updateJob(@Nonnull String jobId, @Nonnull Job content) throws IOException { + checkNotNull(jobId, "jobId"); + checkNotNull(content, "content"); Jobs.Update jobsUpdate = dataflow.projects().jobs() - .update(options.getProject(), jobId, content); + .update(projectId, jobId, content); return jobsUpdate.execute(); } @@ -83,8 +86,9 @@ public class DataflowClient { * Gets the Dataflow {@link Job} with the given {@code jobId}. */ public Job getJob(@Nonnull String jobId) throws IOException { + checkNotNull(jobId, "jobId"); Jobs.Get jobsGet = dataflow.projects().jobs() - .get(options.getProject(), jobId); + .get(projectId, jobId); return jobsGet.execute(); } @@ -92,8 +96,9 @@ public class DataflowClient { * Gets the {@link JobMetrics} with the given {@code jobId}. */ public JobMetrics getJobMetrics(@Nonnull String jobId) throws IOException { + checkNotNull(jobId, "jobId"); Jobs.GetMetrics jobsGetMetrics = dataflow.projects().jobs() - .getMetrics(options.getProject(), jobId); + .getMetrics(projectId, jobId); return jobsGetMetrics.execute(); } @@ -102,8 +107,9 @@ public class DataflowClient { */ public ListJobMessagesResponse listJobMessages( @Nonnull String jobId, @Nullable String pageToken) throws IOException { + checkNotNull(jobId, "jobId"); Jobs.Messages.List jobMessagesList = dataflow.projects().jobs().messages() - .list(options.getProject(), jobId) + .list(projectId, jobId) .setPageToken(pageToken); return jobMessagesList.execute(); } @@ -113,8 +119,10 @@ public class DataflowClient { */ public LeaseWorkItemResponse leaseWorkItem( @Nonnull String jobId, @Nonnull LeaseWorkItemRequest request) throws IOException { + checkNotNull(jobId, "jobId"); + checkNotNull(request, "request"); Jobs.WorkItems.Lease jobWorkItemsLease = dataflow.projects().jobs().workItems() - .lease(options.getProject(), jobId, request); + .lease(projectId, jobId, request); return jobWorkItemsLease.execute(); } @@ -123,8 +131,10 @@ public class DataflowClient { */ public ReportWorkItemStatusResponse reportWorkItemStatus( @Nonnull String jobId, @Nonnull ReportWorkItemStatusRequest request) throws IOException { + checkNotNull(jobId, "jobId"); + checkNotNull(request, "request"); Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus = dataflow.projects().jobs().workItems() - .reportStatus(options.getProject(), jobId, request); + .reportStatus(projectId, jobId, request); return jobWorkItemsReportStatus.execute(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 58e85e0..00c88f9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -62,10 +62,15 @@ public class DataflowPipelineJob implements PipelineResult { private String jobId; /** + * The {@link DataflowPipelineOptions} for the job. + */ + private final DataflowPipelineOptions dataflowOptions; + + /** * Client for the Dataflow service. This can be used to query the service * for information about the job. */ - private DataflowPipelineOptions dataflowOptions; + private final DataflowClient dataflowClient; /** * The state the job terminated in or {@code null} if the job has not terminated. @@ -124,6 +129,7 @@ public class DataflowPipelineJob implements PipelineResult { DataflowAggregatorTransforms aggregatorTransforms) { this.jobId = jobId; this.dataflowOptions = dataflowOptions; + this.dataflowClient = (dataflowOptions == null ? null : DataflowClient.create(dataflowOptions)); this.aggregatorTransforms = aggregatorTransforms; } @@ -241,7 +247,7 @@ public class DataflowPipelineJob implements PipelineResult { MonitoringUtil.JobMessagesHandler messageHandler, Sleeper sleeper, NanoClock nanoClock) throws IOException, InterruptedException { - MonitoringUtil monitor = new MonitoringUtil(getProjectId(), dataflowOptions.getDataflowClient()); + MonitoringUtil monitor = new MonitoringUtil(dataflowClient); long lastTimestamp = 0; BackOff backoff; @@ -334,9 +340,7 @@ public class DataflowPipelineJob implements PipelineResult { content.setId(jobId); content.setRequestedState("JOB_STATE_CANCELLED"); try { - dataflowOptions.getDataflowClient().projects().jobs() - .update(getProjectId(), jobId, content) - .execute(); + dataflowClient.updateJob(jobId, content); return State.CANCELLED; } catch (IOException e) { State state = getState(); @@ -401,11 +405,7 @@ public class DataflowPipelineJob implements PipelineResult { // Retry loop ends in return or throw while (true) { try { - Job job = dataflowOptions.getDataflowClient() - .projects() - .jobs() - .get(getProjectId(), jobId) - .execute(); + Job job = dataflowClient.getJob(jobId); State currentState = MonitoringUtil.toState(job.getCurrentState()); if (currentState.isTerminal()) { terminalState = currentState; @@ -476,8 +476,7 @@ public class DataflowPipelineJob implements PipelineResult { metricUpdates = terminalMetricUpdates; } else { boolean terminal = getState().isTerminal(); - JobMetrics jobMetrics = dataflowOptions.getDataflowClient() - .projects().jobs().getMetrics(getProjectId(), jobId).execute(); + JobMetrics jobMetrics = dataflowClient.getJobMetrics(jobId); metricUpdates = jobMetrics.getMetrics(); if (terminal && jobMetrics.getMetrics() != null) { terminalMetricUpdates = metricUpdates; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index e781b4e..40d8948 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -32,7 +32,6 @@ import com.google.api.services.clouddebugger.v2.Clouddebugger; import com.google.api.services.clouddebugger.v2.model.Debuggee; import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.ListJobsResponse; @@ -194,7 +193,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { private final DataflowPipelineOptions options; /** Client for the Dataflow service. This is used to actually submit jobs. */ - private final Dataflow dataflowClient; + private final DataflowClient dataflowClient; /** Translator for this DataflowRunner, based on options. */ private final DataflowPipelineTranslator translator; @@ -321,7 +320,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @VisibleForTesting protected DataflowRunner(DataflowPipelineOptions options) { this.options = options; - this.dataflowClient = options.getDataflowClient(); + this.dataflowClient = DataflowClient.create(options); this.translator = DataflowPipelineTranslator.fromOptions(options); this.pcollectionsRequiringIndexedFormat = new HashSet<>(); this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); @@ -597,11 +596,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } Job jobResult; try { - jobResult = dataflowClient - .projects() - .jobs() - .create(options.getProject(), newJob) - .execute(); + jobResult = dataflowClient.createJob(newJob); } catch (GoogleJsonResponseException e) { String errorMessages = "Unexpected errors"; if (e.getDetails() != null) { @@ -2830,10 +2825,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { ListJobsResponse listResult; String token = null; do { - listResult = dataflowClient.projects().jobs() - .list(options.getProject()) - .setPageToken(token) - .execute(); + listResult = dataflowClient.listJobs(token); token = listResult.getNextPageToken(); for (Job job : listResult.getJobs()) { if (job.getName().equals(jobName) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 70c3f58..4b0fcf2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -34,6 +34,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -65,11 +66,13 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); private final TestDataflowPipelineOptions options; + private final DataflowClient dataflowClient; private final DataflowRunner runner; private int expectedNumberOfAssertions = 0; TestDataflowRunner(TestDataflowPipelineOptions options) { this.options = options; + this.dataflowClient = DataflowClient.create(options); this.runner = DataflowRunner.fromOptions(options); } @@ -279,8 +282,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> { JobMetrics getJobMetrics(DataflowPipelineJob job) { JobMetrics metrics = null; try { - metrics = options.getDataflowClient().projects().jobs() - .getMetrics(job.getProjectId(), job.getJobId()).execute(); + metrics = dataflowClient.getJobMetrics(job.getJobId()); } catch (IOException e) { LOG.warn("Failed to get job metrics: ", e); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index efb6d2b..d0a24bf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.util; import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime; import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages; import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.ListJobMessagesResponse; import com.google.common.base.MoreObjects; @@ -35,6 +34,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.PipelineResult.State; import org.joda.time.Instant; @@ -67,8 +67,7 @@ public final class MonitoringUtil { private static final String JOB_MESSAGE_DETAILED = "JOB_MESSAGE_DETAILED"; private static final String JOB_MESSAGE_DEBUG = "JOB_MESSAGE_DEBUG"; - private String projectId; - private Messages messagesClient; + private final DataflowClient dataflowClient; /** * An interface that can be used for defining callbacks to receive a list @@ -115,14 +114,8 @@ public final class MonitoringUtil { } /** Construct a helper for monitoring. */ - public MonitoringUtil(String projectId, Dataflow dataflow) { - this(projectId, dataflow.projects().jobs().messages()); - } - - // @VisibleForTesting - MonitoringUtil(String projectId, Messages messagesClient) { - this.projectId = projectId; - this.messagesClient = messagesClient; + public MonitoringUtil(DataflowClient dataflowClient) { + this.dataflowClient = dataflowClient; } /** @@ -157,12 +150,7 @@ public final class MonitoringUtil { ArrayList<JobMessage> allMessages = new ArrayList<>(); String pageToken = null; while (true) { - Messages.List listRequest = messagesClient.list(projectId, jobId); - if (pageToken != null) { - listRequest.setPageToken(pageToken); - } - ListJobMessagesResponse response = listRequest.execute(); - + ListJobMessagesResponse response = dataflowClient.listJobMessages(jobId, pageToken); if (response == null || response.getJobMessages() == null) { return allMessages; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 323f762..1890da1 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -157,6 +157,7 @@ public class DataflowPipelineJobTest { Messages.List listRequest = mock(Dataflow.Projects.Jobs.Messages.List.class); when(mockJobs.messages()).thenReturn(mockMessages); when(mockMessages.list(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(listRequest); + when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest); when(listRequest.execute()).thenThrow(SocketTimeoutException.class); DataflowAggregatorTransforms dataflowAggregatorTransforms = mock(DataflowAggregatorTransforms.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java index bb84d98..67408ae 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.transforms; +import com.google.api.services.dataflow.Dataflow; import java.util.Arrays; import java.util.List; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -38,11 +39,14 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; /** Tests for {@link GroupByKey} for the {@link DataflowRunner}. */ @RunWith(JUnit4.class) @@ -50,6 +54,14 @@ public class DataflowGroupByKeyTest { @Rule public ExpectedException thrown = ExpectedException.none(); + @Mock + private Dataflow dataflow; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + /** * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey} * is not expanded. This is used for verifying that even without expansion the proper errors show @@ -61,7 +73,7 @@ public class DataflowGroupByKeyTest { options.setProject("someproject"); options.setGcpTempLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); + options.setDataflowClient(dataflow); return Pipeline.create(options); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java index ed3f2cd..b9220af 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.transforms; +import com.google.api.services.dataflow.Dataflow; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; @@ -36,12 +37,15 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matchers; import org.joda.time.Duration; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; /** Tests for {@link View} for a {@link DataflowRunner}. */ @RunWith(JUnit4.class) @@ -49,13 +53,21 @@ public class DataflowViewTest { @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Mock + private Dataflow dataflow; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + private Pipeline createTestBatchRunner() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setProject("someproject"); options.setGcpTempLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); + options.setDataflowClient(dataflow); return Pipeline.create(options); } @@ -66,7 +78,7 @@ public class DataflowViewTest { options.setProject("someproject"); options.setGcpTempLocation("gs://staging"); options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); + options.setDataflowClient(dataflow); return Pipeline.create(options); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java index 6c5a2be..23ed26f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java @@ -19,16 +19,15 @@ package org.apache.beam.runners.dataflow.util; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.ListJobMessagesResponse; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.dataflow.DataflowClient; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler; import org.apache.beam.sdk.PipelineResult.State; @@ -57,15 +56,7 @@ public class MonitoringUtilTest { @Test public void testGetJobMessages() throws IOException { - Dataflow.Projects.Jobs.Messages mockMessages = mock(Dataflow.Projects.Jobs.Messages.class); - - // Two requests are needed to get all the messages. - Dataflow.Projects.Jobs.Messages.List firstRequest = - mock(Dataflow.Projects.Jobs.Messages.List.class); - Dataflow.Projects.Jobs.Messages.List secondRequest = - mock(Dataflow.Projects.Jobs.Messages.List.class); - - when(mockMessages.list(PROJECT_ID, JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest); + DataflowClient dataflowClient = mock(DataflowClient.class); ListJobMessagesResponse firstResponse = new ListJobMessagesResponse(); firstResponse.setJobMessages(new ArrayList<JobMessage>()); @@ -87,15 +78,13 @@ public class MonitoringUtilTest { secondResponse.getJobMessages().add(message); } - when(firstRequest.execute()).thenReturn(firstResponse); - when(secondRequest.execute()).thenReturn(secondResponse); + when(dataflowClient.listJobMessages(JOB_ID, null)).thenReturn(firstResponse); + when(dataflowClient.listJobMessages(JOB_ID, pageToken)).thenReturn(secondResponse); - MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages); + MonitoringUtil util = new MonitoringUtil(dataflowClient); List<JobMessage> messages = util.getJobMessages(JOB_ID, -1); - verify(secondRequest).setPageToken(pageToken); - assertEquals(150, messages.size()); }