Repository: incubator-beam
Updated Branches:
  refs/heads/master 8245f9b4b -> 4872bde8f


Update PubsubGrpcClient to not take in mocks as they are not needed anymore
Update PubsubGrpcClientTest to use an inprocess server to handle 
requests/responses for testing.
Add/remove required dependencies found missing by dep-analyzer plugin during 
upgrade to protobuf 3.0.0 and grpc 1.0.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a447d130
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a447d130
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a447d130

Branch: refs/heads/master
Commit: a447d130172ec7c270fc9f1d16a2f7404898461e
Parents: f93ca9c
Author: Luke Cwik <lc...@google.com>
Authored: Wed Sep 21 19:49:10 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Sep 22 14:37:00 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |  24 +++++
 runners/google-cloud-dataflow-java/pom.xml      |   6 ++
 sdks/java/core/pom.xml                          |  17 +++
 .../apache/beam/sdk/util/PubsubGrpcClient.java  |  34 ++----
 .../beam/sdk/util/PubsubGrpcClientTest.java     | 108 ++++++++++++-------
 sdks/java/io/google-cloud-platform/pom.xml      |   5 +
 6 files changed, 132 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 70373ec..cc93bb9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -321,11 +321,35 @@
 
       <dependency>
         <groupId>io.grpc</groupId>
+        <artifactId>grpc-protobuf</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-protobuf-lite</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.google.protobuf</groupId>
+        <artifactId>protobuf-lite</artifactId>
+        <version>3.0.1</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
         <artifactId>grpc-netty</artifactId>
         <version>${grpc.version}</version>
       </dependency>
 
       <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-stub</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.api-client</groupId>
         <artifactId>google-api-client</artifactId>
         <version>${google-clients.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml 
b/runners/google-cloud-dataflow-java/pom.xml
index bf66f38..acf6cce 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -321,6 +321,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-lite</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>annotations</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 1cf7ba5..c4d3e64 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -308,6 +308,11 @@
       <artifactId>grpc-netty</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
     <!-- grpc-all does not obey IWYU, so we need to exclude from compile
          scope and depend on it at runtime. -->
     <dependency>
@@ -317,6 +322,18 @@
     </dependency>
 
     <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf-lite</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-lite</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.auth</groupId>
       <artifactId>google-auth-library-oauth2-http</artifactId>
       <version>0.4.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index 988b90f..02152ba 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -100,9 +100,7 @@ public class PubsubGrpcClient extends PubsubClient {
                                   idLabel,
                                   DEFAULT_TIMEOUT_S,
                                   channel,
-                                  credentials,
-                                  null /* publisher stub */,
-                                  null /* subscriber stub */);
+                                  credentials);
     }
 
     @Override
@@ -159,16 +157,12 @@ public class PubsubGrpcClient extends PubsubClient {
       @Nullable String idLabel,
       int timeoutSec,
       ManagedChannel publisherChannel,
-      GoogleCredentials credentials,
-      PublisherGrpc.PublisherBlockingStub cachedPublisherStub,
-      SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub) {
+      GoogleCredentials credentials) {
     this.timestampLabel = timestampLabel;
     this.idLabel = idLabel;
     this.timeoutSec = timeoutSec;
     this.publisherChannel = publisherChannel;
     this.credentials = credentials;
-    this.cachedPublisherStub = cachedPublisherStub;
-    this.cachedSubscriberStub = cachedSubscriberStub;
   }
 
   /**
@@ -189,13 +183,11 @@ public class PubsubGrpcClient extends PubsubClient {
     this.publisherChannel = null;
     // Gracefully shutdown the channel.
     publisherChannel.shutdown();
-    if (timeoutSec > 0) {
-      try {
-        publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        // Ignore.
-        Thread.currentThread().interrupt();
-      }
+    try {
+      publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // Ignore.
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -216,11 +208,7 @@ public class PubsubGrpcClient extends PubsubClient {
     if (cachedPublisherStub == null) {
       cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel());
     }
-    if (timeoutSec > 0) {
-      return cachedPublisherStub.withDeadlineAfter(timeoutSec, 
TimeUnit.SECONDS);
-    } else {
-      return cachedPublisherStub;
-    }
+    return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS);
   }
 
   /**
@@ -230,11 +218,7 @@ public class PubsubGrpcClient extends PubsubClient {
     if (cachedSubscriberStub == null) {
       cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel());
     }
-    if (timeoutSec > 0) {
-      return cachedSubscriberStub.withDeadlineAfter(timeoutSec, 
TimeUnit.SECONDS);
-    } else {
-      return cachedSubscriberStub;
-    }
+    return cachedSubscriberStub.withDeadlineAfter(timeoutSec, 
TimeUnit.SECONDS);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
index b36b934..cbdf5da 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java
@@ -23,19 +23,26 @@ import static org.junit.Assert.assertEquals;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import com.google.pubsub.v1.PublishRequest;
 import com.google.pubsub.v1.PublishResponse;
-import com.google.pubsub.v1.PublisherGrpc;
+import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.PullRequest;
 import com.google.pubsub.v1.PullResponse;
 import com.google.pubsub.v1.ReceivedMessage;
-import com.google.pubsub.v1.SubscriberGrpc;
+import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase;
 import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
 import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
 import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
@@ -52,12 +59,11 @@ import org.mockito.Mockito;
  */
 @RunWith(JUnit4.class)
 public class PubsubGrpcClientTest {
-  private ManagedChannel mockChannel;
+  private ManagedChannel inProcessChannel;
   private GoogleCredentials mockCredentials;
-  private PublisherGrpc.PublisherBlockingStub mockPublisherStub;
-  private SubscriberGrpc.SubscriberBlockingStub mockSubscriberStub;
 
   private PubsubClient client;
+  private String channelName;
 
   private static final TopicPath TOPIC = 
PubsubClient.topicPathFromName("testProject", "testTopic");
   private static final SubscriptionPath SUBSCRIPTION =
@@ -73,31 +79,24 @@ public class PubsubGrpcClientTest {
   private static final String ACK_ID = "testAckId";
 
   @Before
-  public void setup() throws IOException {
-    mockChannel = Mockito.mock(ManagedChannel.class);
+  public void setup() {
+    channelName = String.format("%s-%s",
+        PubsubGrpcClientTest.class.getName(), 
ThreadLocalRandom.current().nextInt());
+    inProcessChannel = 
InProcessChannelBuilder.forName(channelName).directExecutor().build();
     mockCredentials = Mockito.mock(GoogleCredentials.class);
-    mockPublisherStub =
-        Mockito.mock(PublisherGrpc.PublisherBlockingStub.class, 
Mockito.RETURNS_DEEP_STUBS);
-    mockSubscriberStub =
-        Mockito.mock(SubscriberGrpc.SubscriberBlockingStub.class, 
Mockito.RETURNS_DEEP_STUBS);
-    client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 0, mockChannel,
-                                  mockCredentials, mockPublisherStub, 
mockSubscriberStub);
+    client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, 
inProcessChannel, mockCredentials);
   }
 
   @After
   public void teardown() throws IOException {
     client.close();
-    client = null;
-    mockChannel = null;
-    mockCredentials = null;
-    mockPublisherStub = null;
-    mockSubscriberStub = null;
+    inProcessChannel.shutdownNow();
   }
 
   @Test
   public void pullOneMessage() throws IOException {
     String expectedSubscription = SUBSCRIPTION.getPath();
-    PullRequest expectedRequest =
+    final PullRequest expectedRequest =
         PullRequest.newBuilder()
                    .setSubscription(expectedSubscription)
                    .setReturnImmediately(true)
@@ -123,20 +122,37 @@ public class PubsubGrpcClientTest {
                        .setMessage(expectedPubsubMessage)
                        .setAckId(ACK_ID)
                        .build();
-    PullResponse expectedResponse =
+    final PullResponse response =
         PullResponse.newBuilder()
                     
.addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage))
                     .build();
-    Mockito.when(mockSubscriberStub.pull(expectedRequest))
-           .thenReturn(expectedResponse);
-    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 
10, true);
-    assertEquals(1, acutalMessages.size());
-    IncomingMessage actualMessage = acutalMessages.get(0);
-    assertEquals(ACK_ID, actualMessage.ackId);
-    assertEquals(DATA, new String(actualMessage.elementBytes));
-    assertEquals(RECORD_ID, actualMessage.recordId);
-    assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
-    assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+
+    final List<PullRequest> requestsReceived = new ArrayList<>();
+    SubscriberImplBase subscriberImplBase = new SubscriberImplBase() {
+      @Override
+      public void pull(PullRequest request, StreamObserver<PullResponse> 
responseObserver) {
+        requestsReceived.add(request);
+        responseObserver.onNext(response);
+        responseObserver.onCompleted();
+      }
+    };
+    Server server = InProcessServerBuilder.forName(channelName)
+        .addService(subscriberImplBase)
+        .build()
+        .start();
+    try {
+      List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, 
SUBSCRIPTION, 10, true);
+      assertEquals(1, acutalMessages.size());
+      IncomingMessage actualMessage = acutalMessages.get(0);
+      assertEquals(ACK_ID, actualMessage.ackId);
+      assertEquals(DATA, new String(actualMessage.elementBytes));
+      assertEquals(RECORD_ID, actualMessage.recordId);
+      assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch);
+      assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch);
+      assertEquals(expectedRequest, 
Iterables.getOnlyElement(requestsReceived));
+    } finally {
+      server.shutdownNow();
+    }
   }
 
   @Test
@@ -149,20 +165,38 @@ public class PubsubGrpcClientTest {
                          ImmutableMap.of(TIMESTAMP_LABEL, 
String.valueOf(MESSAGE_TIME),
                                          ID_LABEL, RECORD_ID))
                      .build();
-    PublishRequest expectedRequest =
+    final PublishRequest expectedRequest =
         PublishRequest.newBuilder()
                       .setTopic(expectedTopic)
                       .addAllMessages(
                           ImmutableList.of(expectedPubsubMessage))
                       .build();
-    PublishResponse expectedResponse =
+    final PublishResponse response =
         PublishResponse.newBuilder()
                        .addAllMessageIds(ImmutableList.of(MESSAGE_ID))
                        .build();
-    Mockito.when(mockPublisherStub.publish(expectedRequest))
-           .thenReturn(expectedResponse);
-    OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), 
MESSAGE_TIME, RECORD_ID);
-    int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
-    assertEquals(1, n);
+
+    final List<PublishRequest> requestsReceived = new ArrayList<>();
+    PublisherImplBase publisherImplBase = new PublisherImplBase() {
+      @Override
+      public void publish(
+          PublishRequest request, StreamObserver<PublishResponse> 
responseObserver) {
+        requestsReceived.add(request);
+        responseObserver.onNext(response);
+        responseObserver.onCompleted();
+      }
+    };
+    Server server = InProcessServerBuilder.forName(channelName)
+        .addService(publisherImplBase)
+        .build()
+        .start();
+    try {
+      OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), 
MESSAGE_TIME, RECORD_ID);
+      int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
+      assertEquals(1, n);
+      assertEquals(expectedRequest, 
Iterables.getOnlyElement(requestsReceived));
+    } finally {
+      server.shutdownNow();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml 
b/sdks/java/io/google-cloud-platform/pom.xml
index a0f3fef..573c457 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -124,6 +124,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-lite</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-core</artifactId>
     </dependency>

Reply via email to