sijie closed pull request #1720: [table service] Intercepted storage server channel should shutdown the underlying managed channel URL: https://github.com/apache/bookkeeper/pull/1720
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java index 76c763138d..e5206de6a3 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java @@ -58,6 +58,7 @@ private final Optional<String> token; private final Channel channel; + private final StorageServerChannel interceptedServerChannel; @GuardedBy("this") private RootRangeServiceFutureStub rootRangeService; @@ -87,6 +88,11 @@ public StorageServerChannel(Endpoint endpoint, resolvedEndpoint.getPort()) .usePlaintext(usePlainText) .build(); + this.interceptedServerChannel = null; + } + + public Channel getGrpcChannel() { + return channel; } @VisibleForTesting @@ -97,8 +103,15 @@ public StorageServerChannel(ManagedChannel channel, protected StorageServerChannel(Channel channel, Optional<String> token) { + this(channel, token, null); + } + + private StorageServerChannel(Channel channel, + Optional<String> token, + StorageServerChannel interceptedServerChannel) { this.token = token; this.channel = channel; + this.interceptedServerChannel = interceptedServerChannel; } public synchronized RootRangeServiceFutureStub getRootRangeService() { @@ -153,13 +166,18 @@ public StorageServerChannel intercept(ClientInterceptor... interceptors) { interceptors); return new StorageServerChannel( interceptedChannel, - this.token); + this.token, + this); } @Override public void close() { - if (channel instanceof ManagedChannel) { - ((ManagedChannel) channel).shutdown(); + if (interceptedServerChannel != null) { + interceptedServerChannel.close(); + } else { + if (channel instanceof ManagedChannel) { + ((ManagedChannel) channel).shutdown(); + } } } } diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java index 1dd5f864aa..e1260665ad 100644 --- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java +++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java @@ -19,6 +19,9 @@ package org.apache.bookkeeper.clients.impl.channel; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import io.grpc.ManagedChannel; import io.grpc.Server; @@ -68,4 +71,13 @@ public void testBasic() { channel.close(); } + @Test + public void testIntercept() { + ManagedChannel channel = mock(ManagedChannel.class); + StorageServerChannel ssChannel = new StorageServerChannel(channel, Optional.empty()); + StorageServerChannel interceptedChannel = ssChannel.intercept(1L); + interceptedChannel.close(); + verify(channel, times(1)).shutdown(); + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services