Repository: incubator-beam
Updated Branches:
  refs/heads/master afedd68e8 -> b2b570f27


[BEAM-1047] Update dataflow runner code to use DataflowClient wrapper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ce03f30c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ce03f30c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ce03f30c

Branch: refs/heads/master
Commit: ce03f30c1ee0b84ad2e7f10a6272ffb25548244a
Parents: e8c9686
Author: Pei He <pe...@google.com>
Authored: Mon Nov 28 11:47:42 2016 -0800
Committer: bchambers <bchamb...@google.com>
Committed: Tue Dec 6 17:08:12 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowClient.java   | 36 +++++++++++++-------
 .../runners/dataflow/DataflowPipelineJob.java   | 23 ++++++-------
 .../beam/runners/dataflow/DataflowRunner.java   | 16 +++------
 .../dataflow/testing/TestDataflowRunner.java    |  6 ++--
 .../runners/dataflow/util/MonitoringUtil.java   | 22 +++---------
 .../dataflow/DataflowPipelineJobTest.java       |  1 +
 .../transforms/DataflowGroupByKeyTest.java      | 14 +++++++-
 .../dataflow/transforms/DataflowViewTest.java   | 16 +++++++--
 .../dataflow/util/MonitoringUtilTest.java       | 21 +++---------
 9 files changed, 80 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
index f2081db..3536d72 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
@@ -35,27 +35,28 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 
 /**
- * Client library for {@link Dataflow}.
+ * Wrapper around the generated {@link Dataflow} client to provide common 
functionality.
  */
 public class DataflowClient {
 
   public static DataflowClient create(DataflowPipelineOptions options) {
-    return new DataflowClient(options.getDataflowClient(), options);
+    return new DataflowClient(options.getDataflowClient(), 
options.getProject());
   }
 
   private final Dataflow dataflow;
-  private final DataflowPipelineOptions options;
+  private final String projectId;
 
-  private DataflowClient(Dataflow dataflow, DataflowPipelineOptions options) {
+  private DataflowClient(Dataflow dataflow, String projectId) {
     this.dataflow = checkNotNull(dataflow, "dataflow");
-    this.options = checkNotNull(options, "options");
+    this.projectId = checkNotNull(projectId, "options");
   }
 
   /**
    * Creates the Dataflow {@link Job}.
    */
   public Job createJob(@Nonnull Job job) throws IOException {
-    Jobs.Create jobsCreate = 
dataflow.projects().jobs().create(options.getProject(), job);
+    checkNotNull(job, "job");
+    Jobs.Create jobsCreate = dataflow.projects().jobs().create(projectId, job);
     return jobsCreate.execute();
   }
 
@@ -65,7 +66,7 @@ public class DataflowClient {
    */
   public ListJobsResponse listJobs(@Nullable String pageToken) throws 
IOException {
     Jobs.List jobsList = dataflow.projects().jobs()
-        .list(options.getProject())
+        .list(projectId)
         .setPageToken(pageToken);
     return jobsList.execute();
   }
@@ -74,8 +75,10 @@ public class DataflowClient {
    * Updates the Dataflow {@link Job} with the given {@code jobId}.
    */
   public Job updateJob(@Nonnull String jobId, @Nonnull Job content) throws 
IOException {
+    checkNotNull(jobId, "jobId");
+    checkNotNull(content, "content");
     Jobs.Update jobsUpdate = dataflow.projects().jobs()
-        .update(options.getProject(), jobId, content);
+        .update(projectId, jobId, content);
     return jobsUpdate.execute();
   }
 
@@ -83,8 +86,9 @@ public class DataflowClient {
    * Gets the Dataflow {@link Job} with the given {@code jobId}.
    */
   public Job getJob(@Nonnull String jobId) throws IOException {
+    checkNotNull(jobId, "jobId");
     Jobs.Get jobsGet = dataflow.projects().jobs()
-        .get(options.getProject(), jobId);
+        .get(projectId, jobId);
     return jobsGet.execute();
   }
 
@@ -92,8 +96,9 @@ public class DataflowClient {
    * Gets the {@link JobMetrics} with the given {@code jobId}.
    */
   public JobMetrics getJobMetrics(@Nonnull String jobId) throws IOException {
+    checkNotNull(jobId, "jobId");
     Jobs.GetMetrics jobsGetMetrics = dataflow.projects().jobs()
-        .getMetrics(options.getProject(), jobId);
+        .getMetrics(projectId, jobId);
     return jobsGetMetrics.execute();
   }
 
@@ -102,8 +107,9 @@ public class DataflowClient {
    */
   public ListJobMessagesResponse listJobMessages(
       @Nonnull String jobId, @Nullable String pageToken) throws IOException {
+    checkNotNull(jobId, "jobId");
     Jobs.Messages.List jobMessagesList = dataflow.projects().jobs().messages()
-        .list(options.getProject(), jobId)
+        .list(projectId, jobId)
         .setPageToken(pageToken);
     return jobMessagesList.execute();
   }
@@ -113,8 +119,10 @@ public class DataflowClient {
    */
   public LeaseWorkItemResponse leaseWorkItem(
       @Nonnull String jobId, @Nonnull LeaseWorkItemRequest request) throws 
IOException {
+    checkNotNull(jobId, "jobId");
+    checkNotNull(request, "request");
     Jobs.WorkItems.Lease jobWorkItemsLease = 
dataflow.projects().jobs().workItems()
-        .lease(options.getProject(), jobId, request);
+        .lease(projectId, jobId, request);
     return jobWorkItemsLease.execute();
   }
 
@@ -123,8 +131,10 @@ public class DataflowClient {
    */
   public ReportWorkItemStatusResponse reportWorkItemStatus(
       @Nonnull String jobId, @Nonnull ReportWorkItemStatusRequest request) 
throws IOException {
+    checkNotNull(jobId, "jobId");
+    checkNotNull(request, "request");
     Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus = 
dataflow.projects().jobs().workItems()
-        .reportStatus(options.getProject(), jobId, request);
+        .reportStatus(projectId, jobId, request);
     return jobWorkItemsReportStatus.execute();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 58e85e0..00c88f9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -62,10 +62,15 @@ public class DataflowPipelineJob implements PipelineResult {
   private String jobId;
 
   /**
+   * The {@link DataflowPipelineOptions} for the job.
+   */
+  private final DataflowPipelineOptions dataflowOptions;
+
+  /**
    * Client for the Dataflow service. This can be used to query the service
    * for information about the job.
    */
-  private DataflowPipelineOptions dataflowOptions;
+  private final DataflowClient dataflowClient;
 
   /**
    * The state the job terminated in or {@code null} if the job has not 
terminated.
@@ -124,6 +129,7 @@ public class DataflowPipelineJob implements PipelineResult {
       DataflowAggregatorTransforms aggregatorTransforms) {
     this.jobId = jobId;
     this.dataflowOptions = dataflowOptions;
+    this.dataflowClient = (dataflowOptions == null ? null : 
DataflowClient.create(dataflowOptions));
     this.aggregatorTransforms = aggregatorTransforms;
   }
 
@@ -241,7 +247,7 @@ public class DataflowPipelineJob implements PipelineResult {
       MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
       NanoClock nanoClock) throws IOException, InterruptedException {
-    MonitoringUtil monitor = new MonitoringUtil(getProjectId(), 
dataflowOptions.getDataflowClient());
+    MonitoringUtil monitor = new MonitoringUtil(dataflowClient);
 
     long lastTimestamp = 0;
     BackOff backoff;
@@ -334,9 +340,7 @@ public class DataflowPipelineJob implements PipelineResult {
     content.setId(jobId);
     content.setRequestedState("JOB_STATE_CANCELLED");
     try {
-      dataflowOptions.getDataflowClient().projects().jobs()
-          .update(getProjectId(), jobId, content)
-          .execute();
+      dataflowClient.updateJob(jobId, content);
       return State.CANCELLED;
     } catch (IOException e) {
       State state = getState();
@@ -401,11 +405,7 @@ public class DataflowPipelineJob implements PipelineResult 
{
     // Retry loop ends in return or throw
     while (true) {
       try {
-        Job job = dataflowOptions.getDataflowClient()
-            .projects()
-            .jobs()
-            .get(getProjectId(), jobId)
-            .execute();
+        Job job = dataflowClient.getJob(jobId);
         State currentState = MonitoringUtil.toState(job.getCurrentState());
         if (currentState.isTerminal()) {
           terminalState = currentState;
@@ -476,8 +476,7 @@ public class DataflowPipelineJob implements PipelineResult {
         metricUpdates = terminalMetricUpdates;
       } else {
         boolean terminal = getState().isTerminal();
-        JobMetrics jobMetrics = dataflowOptions.getDataflowClient()
-            .projects().jobs().getMetrics(getProjectId(), jobId).execute();
+        JobMetrics jobMetrics = dataflowClient.getJobMetrics(jobId);
         metricUpdates = jobMetrics.getMetrics();
         if (terminal && jobMetrics.getMetrics() != null) {
           terminalMetricUpdates = metricUpdates;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e781b4e..40d8948 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -32,7 +32,6 @@ import com.google.api.services.clouddebugger.v2.Clouddebugger;
 import com.google.api.services.clouddebugger.v2.model.Debuggee;
 import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
 import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
@@ -194,7 +193,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   private final DataflowPipelineOptions options;
 
   /** Client for the Dataflow service. This is used to actually submit jobs. */
-  private final Dataflow dataflowClient;
+  private final DataflowClient dataflowClient;
 
   /** Translator for this DataflowRunner, based on options. */
   private final DataflowPipelineTranslator translator;
@@ -321,7 +320,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
   @VisibleForTesting protected DataflowRunner(DataflowPipelineOptions options) 
{
     this.options = options;
-    this.dataflowClient = options.getDataflowClient();
+    this.dataflowClient = DataflowClient.create(options);
     this.translator = DataflowPipelineTranslator.fromOptions(options);
     this.pcollectionsRequiringIndexedFormat = new HashSet<>();
     this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
@@ -597,11 +596,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
     Job jobResult;
     try {
-      jobResult = dataflowClient
-              .projects()
-              .jobs()
-              .create(options.getProject(), newJob)
-              .execute();
+      jobResult = dataflowClient.createJob(newJob);
     } catch (GoogleJsonResponseException e) {
       String errorMessages = "Unexpected errors";
       if (e.getDetails() != null) {
@@ -2830,10 +2825,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       ListJobsResponse listResult;
       String token = null;
       do {
-        listResult = dataflowClient.projects().jobs()
-            .list(options.getProject())
-            .setPageToken(token)
-            .execute();
+        listResult = dataflowClient.listJobs(token);
         token = listResult.getNextPageToken();
         for (Job job : listResult.getJobs()) {
           if (job.getName().equals(jobName)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 70c3f58..4b0fcf2 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -65,11 +66,13 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDataflowRunner.class);
 
   private final TestDataflowPipelineOptions options;
+  private final DataflowClient dataflowClient;
   private final DataflowRunner runner;
   private int expectedNumberOfAssertions = 0;
 
   TestDataflowRunner(TestDataflowPipelineOptions options) {
     this.options = options;
+    this.dataflowClient = DataflowClient.create(options);
     this.runner = DataflowRunner.fromOptions(options);
   }
 
@@ -279,8 +282,7 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   JobMetrics getJobMetrics(DataflowPipelineJob job) {
     JobMetrics metrics = null;
     try {
-      metrics = options.getDataflowClient().projects().jobs()
-          .getMetrics(job.getProjectId(), job.getJobId()).execute();
+      metrics = dataflowClient.getJobMetrics(job.getJobId());
     } catch (IOException e) {
       LOG.warn("Failed to get job metrics: ", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index efb6d2b..d0a24bf 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.util;
 import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
 
 import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.ListJobMessagesResponse;
 import com.google.common.base.MoreObjects;
@@ -35,6 +34,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.joda.time.Instant;
@@ -67,8 +67,7 @@ public final class MonitoringUtil {
   private static final String JOB_MESSAGE_DETAILED = "JOB_MESSAGE_DETAILED";
   private static final String JOB_MESSAGE_DEBUG = "JOB_MESSAGE_DEBUG";
 
-  private String projectId;
-  private Messages messagesClient;
+  private final DataflowClient dataflowClient;
 
   /**
    * An interface that can be used for defining callbacks to receive a list
@@ -115,14 +114,8 @@ public final class MonitoringUtil {
   }
 
   /** Construct a helper for monitoring. */
-  public MonitoringUtil(String projectId, Dataflow dataflow) {
-    this(projectId, dataflow.projects().jobs().messages());
-  }
-
-  // @VisibleForTesting
-  MonitoringUtil(String projectId, Messages messagesClient) {
-    this.projectId = projectId;
-    this.messagesClient = messagesClient;
+  public MonitoringUtil(DataflowClient dataflowClient) {
+    this.dataflowClient = dataflowClient;
   }
 
   /**
@@ -157,12 +150,7 @@ public final class MonitoringUtil {
     ArrayList<JobMessage> allMessages = new ArrayList<>();
     String pageToken = null;
     while (true) {
-      Messages.List listRequest = messagesClient.list(projectId, jobId);
-      if (pageToken != null) {
-        listRequest.setPageToken(pageToken);
-      }
-      ListJobMessagesResponse response = listRequest.execute();
-
+      ListJobMessagesResponse response = dataflowClient.listJobMessages(jobId, 
pageToken);
       if (response == null || response.getJobMessages() == null) {
         return allMessages;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 323f762..1890da1 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -157,6 +157,7 @@ public class DataflowPipelineJobTest {
     Messages.List listRequest = 
mock(Dataflow.Projects.Jobs.Messages.List.class);
     when(mockJobs.messages()).thenReturn(mockMessages);
     when(mockMessages.list(eq(PROJECT_ID), 
eq(JOB_ID))).thenReturn(listRequest);
+    when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest);
     when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
index bb84d98..67408ae 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.transforms;
 
+import com.google.api.services.dataflow.Dataflow;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -38,11 +39,14 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link GroupByKey} for the {@link DataflowRunner}. */
 @RunWith(JUnit4.class)
@@ -50,6 +54,14 @@ public class DataflowGroupByKeyTest {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
+  @Mock
+  private Dataflow dataflow;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
   /**
    * Create a test pipeline that uses the {@link DataflowRunner} so that 
{@link GroupByKey}
    * is not expanded. This is used for verifying that even without expansion 
the proper errors show
@@ -61,7 +73,7 @@ public class DataflowGroupByKeyTest {
     options.setProject("someproject");
     options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
+    options.setDataflowClient(dataflow);
     return Pipeline.create(options);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
index ed3f2cd..b9220af 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowViewTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.transforms;
 
+import com.google.api.services.dataflow.Dataflow;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
@@ -36,12 +37,15 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link View} for a {@link DataflowRunner}. */
 @RunWith(JUnit4.class)
@@ -49,13 +53,21 @@ public class DataflowViewTest {
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
+  @Mock
+  private Dataflow dataflow;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
   private Pipeline createTestBatchRunner() {
     DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject("someproject");
     options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
+    options.setDataflowClient(dataflow);
     return Pipeline.create(options);
   }
 
@@ -66,7 +78,7 @@ public class DataflowViewTest {
     options.setProject("someproject");
     options.setGcpTempLocation("gs://staging");
     options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
+    options.setDataflowClient(dataflow);
     return Pipeline.create(options);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce03f30c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index 6c5a2be..23ed26f 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -19,16 +19,15 @@ package org.apache.beam.runners.dataflow.util;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.ListJobMessagesResponse;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.runners.dataflow.DataflowClient;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -57,15 +56,7 @@ public class MonitoringUtilTest {
 
   @Test
   public void testGetJobMessages() throws IOException {
-    Dataflow.Projects.Jobs.Messages mockMessages = 
mock(Dataflow.Projects.Jobs.Messages.class);
-
-    // Two requests are needed to get all the messages.
-    Dataflow.Projects.Jobs.Messages.List firstRequest =
-        mock(Dataflow.Projects.Jobs.Messages.List.class);
-    Dataflow.Projects.Jobs.Messages.List secondRequest =
-        mock(Dataflow.Projects.Jobs.Messages.List.class);
-
-    when(mockMessages.list(PROJECT_ID, 
JOB_ID)).thenReturn(firstRequest).thenReturn(secondRequest);
+    DataflowClient dataflowClient = mock(DataflowClient.class);
 
     ListJobMessagesResponse firstResponse = new ListJobMessagesResponse();
     firstResponse.setJobMessages(new ArrayList<JobMessage>());
@@ -87,15 +78,13 @@ public class MonitoringUtilTest {
       secondResponse.getJobMessages().add(message);
     }
 
-    when(firstRequest.execute()).thenReturn(firstResponse);
-    when(secondRequest.execute()).thenReturn(secondResponse);
+    when(dataflowClient.listJobMessages(JOB_ID, 
null)).thenReturn(firstResponse);
+    when(dataflowClient.listJobMessages(JOB_ID, 
pageToken)).thenReturn(secondResponse);
 
-    MonitoringUtil util = new MonitoringUtil(PROJECT_ID, mockMessages);
+    MonitoringUtil util = new MonitoringUtil(dataflowClient);
 
     List<JobMessage> messages = util.getJobMessages(JOB_ID, -1);
 
-    verify(secondRequest).setPageToken(pageToken);
-
     assertEquals(150, messages.size());
   }
 

Reply via email to