Repository: incubator-beam Updated Branches: refs/heads/master 8245f9b4b -> 4872bde8f
Update PubsubGrpcClient to not take in mocks as they are not needed anymore Update PubsubGrpcClientTest to use an inprocess server to handle requests/responses for testing. Add/remove required dependencies found missing by dep-analyzer plugin during upgrade to protobuf 3.0.0 and grpc 1.0.1 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a447d130 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a447d130 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a447d130 Branch: refs/heads/master Commit: a447d130172ec7c270fc9f1d16a2f7404898461e Parents: f93ca9c Author: Luke Cwik <lc...@google.com> Authored: Wed Sep 21 19:49:10 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Sep 22 14:37:00 2016 -0700 ---------------------------------------------------------------------- pom.xml | 24 +++++ runners/google-cloud-dataflow-java/pom.xml | 6 ++ sdks/java/core/pom.xml | 17 +++ .../apache/beam/sdk/util/PubsubGrpcClient.java | 34 ++---- .../beam/sdk/util/PubsubGrpcClientTest.java | 108 ++++++++++++------- sdks/java/io/google-cloud-platform/pom.xml | 5 + 6 files changed, 132 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 70373ec..cc93bb9 100644 --- a/pom.xml +++ b/pom.xml @@ -321,11 +321,35 @@ <dependency> <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf-lite</artifactId> + <version>${grpc.version}</version> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-lite</artifactId> + <version>3.0.1</version> + </dependency> + + <dependency> + <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>${grpc.version}</version> </dependency> <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + + <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> <version>${google-clients.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index bf66f38..acf6cce 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -321,6 +321,12 @@ </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-lite</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>annotations</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 1cf7ba5..c4d3e64 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -308,6 +308,11 @@ <artifactId>grpc-netty</artifactId> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> + <!-- grpc-all does not obey IWYU, so we need to exclude from compile scope and depend on it at runtime. --> <dependency> @@ -317,6 +322,18 @@ </dependency> <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf-lite</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-lite</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> <groupId>com.google.auth</groupId> <artifactId>google-auth-library-oauth2-http</artifactId> <version>0.4.0</version> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index 988b90f..02152ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -100,9 +100,7 @@ public class PubsubGrpcClient extends PubsubClient { idLabel, DEFAULT_TIMEOUT_S, channel, - credentials, - null /* publisher stub */, - null /* subscriber stub */); + credentials); } @Override @@ -159,16 +157,12 @@ public class PubsubGrpcClient extends PubsubClient { @Nullable String idLabel, int timeoutSec, ManagedChannel publisherChannel, - GoogleCredentials credentials, - PublisherGrpc.PublisherBlockingStub cachedPublisherStub, - SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub) { + GoogleCredentials credentials) { this.timestampLabel = timestampLabel; this.idLabel = idLabel; this.timeoutSec = timeoutSec; this.publisherChannel = publisherChannel; this.credentials = credentials; - this.cachedPublisherStub = cachedPublisherStub; - this.cachedSubscriberStub = cachedSubscriberStub; } /** @@ -189,13 +183,11 @@ public class PubsubGrpcClient extends PubsubClient { this.publisherChannel = null; // Gracefully shutdown the channel. publisherChannel.shutdown(); - if (timeoutSec > 0) { - try { - publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // Ignore. - Thread.currentThread().interrupt(); - } + try { + publisherChannel.awaitTermination(timeoutSec, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore. + Thread.currentThread().interrupt(); } } @@ -216,11 +208,7 @@ public class PubsubGrpcClient extends PubsubClient { if (cachedPublisherStub == null) { cachedPublisherStub = PublisherGrpc.newBlockingStub(newChannel()); } - if (timeoutSec > 0) { - return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); - } else { - return cachedPublisherStub; - } + return cachedPublisherStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); } /** @@ -230,11 +218,7 @@ public class PubsubGrpcClient extends PubsubClient { if (cachedSubscriberStub == null) { cachedSubscriberStub = SubscriberGrpc.newBlockingStub(newChannel()); } - if (timeoutSec > 0) { - return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); - } else { - return cachedSubscriberStub; - } + return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java index b36b934..cbdf5da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java @@ -23,19 +23,26 @@ import static org.junit.Assert.assertEquals; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; -import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; -import com.google.pubsub.v1.SubscriberGrpc; +import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase; import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; @@ -52,12 +59,11 @@ import org.mockito.Mockito; */ @RunWith(JUnit4.class) public class PubsubGrpcClientTest { - private ManagedChannel mockChannel; + private ManagedChannel inProcessChannel; private GoogleCredentials mockCredentials; - private PublisherGrpc.PublisherBlockingStub mockPublisherStub; - private SubscriberGrpc.SubscriberBlockingStub mockSubscriberStub; private PubsubClient client; + private String channelName; private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); private static final SubscriptionPath SUBSCRIPTION = @@ -73,31 +79,24 @@ public class PubsubGrpcClientTest { private static final String ACK_ID = "testAckId"; @Before - public void setup() throws IOException { - mockChannel = Mockito.mock(ManagedChannel.class); + public void setup() { + channelName = String.format("%s-%s", + PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt()); + inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build(); mockCredentials = Mockito.mock(GoogleCredentials.class); - mockPublisherStub = - Mockito.mock(PublisherGrpc.PublisherBlockingStub.class, Mockito.RETURNS_DEEP_STUBS); - mockSubscriberStub = - Mockito.mock(SubscriberGrpc.SubscriberBlockingStub.class, Mockito.RETURNS_DEEP_STUBS); - client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 0, mockChannel, - mockCredentials, mockPublisherStub, mockSubscriberStub); + client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, mockCredentials); } @After public void teardown() throws IOException { client.close(); - client = null; - mockChannel = null; - mockCredentials = null; - mockPublisherStub = null; - mockSubscriberStub = null; + inProcessChannel.shutdownNow(); } @Test public void pullOneMessage() throws IOException { String expectedSubscription = SUBSCRIPTION.getPath(); - PullRequest expectedRequest = + final PullRequest expectedRequest = PullRequest.newBuilder() .setSubscription(expectedSubscription) .setReturnImmediately(true) @@ -123,20 +122,37 @@ public class PubsubGrpcClientTest { .setMessage(expectedPubsubMessage) .setAckId(ACK_ID) .build(); - PullResponse expectedResponse = + final PullResponse response = PullResponse.newBuilder() .addAllReceivedMessages(ImmutableList.of(expectedReceivedMessage)) .build(); - Mockito.when(mockSubscriberStub.pull(expectedRequest)) - .thenReturn(expectedResponse); - List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); - assertEquals(1, acutalMessages.size()); - IncomingMessage actualMessage = acutalMessages.get(0); - assertEquals(ACK_ID, actualMessage.ackId); - assertEquals(DATA, new String(actualMessage.elementBytes)); - assertEquals(RECORD_ID, actualMessage.recordId); - assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); - assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + + final List<PullRequest> requestsReceived = new ArrayList<>(); + SubscriberImplBase subscriberImplBase = new SubscriberImplBase() { + @Override + public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) { + requestsReceived.add(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }; + Server server = InProcessServerBuilder.forName(channelName) + .addService(subscriberImplBase) + .build() + .start(); + try { + List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertEquals(ACK_ID, actualMessage.ackId); + assertEquals(DATA, new String(actualMessage.elementBytes)); + assertEquals(RECORD_ID, actualMessage.recordId); + assertEquals(REQ_TIME, actualMessage.requestTimeMsSinceEpoch); + assertEquals(MESSAGE_TIME, actualMessage.timestampMsSinceEpoch); + assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); + } finally { + server.shutdownNow(); + } } @Test @@ -149,20 +165,38 @@ public class PubsubGrpcClientTest { ImmutableMap.of(TIMESTAMP_LABEL, String.valueOf(MESSAGE_TIME), ID_LABEL, RECORD_ID)) .build(); - PublishRequest expectedRequest = + final PublishRequest expectedRequest = PublishRequest.newBuilder() .setTopic(expectedTopic) .addAllMessages( ImmutableList.of(expectedPubsubMessage)) .build(); - PublishResponse expectedResponse = + final PublishResponse response = PublishResponse.newBuilder() .addAllMessageIds(ImmutableList.of(MESSAGE_ID)) .build(); - Mockito.when(mockPublisherStub.publish(expectedRequest)) - .thenReturn(expectedResponse); - OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID); - int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); - assertEquals(1, n); + + final List<PublishRequest> requestsReceived = new ArrayList<>(); + PublisherImplBase publisherImplBase = new PublisherImplBase() { + @Override + public void publish( + PublishRequest request, StreamObserver<PublishResponse> responseObserver) { + requestsReceived.add(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + }; + Server server = InProcessServerBuilder.forName(channelName) + .addService(publisherImplBase) + .build() + .start(); + try { + OutgoingMessage actualMessage = new OutgoingMessage(DATA.getBytes(), MESSAGE_TIME, RECORD_ID); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); + } finally { + server.shutdownNow(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a447d130/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index a0f3fef..573c457 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -124,6 +124,11 @@ </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-lite</artifactId> + </dependency> + + <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-core</artifactId> </dependency>