This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 580d29e [TABLE SERVICE] Intercepted storage server channel should shutdown the underlying managed channel 580d29e is described below commit 580d29e47052fad058456c5028debe75701ea3a8 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Mon Oct 1 08:17:36 2018 -0700 [TABLE SERVICE] Intercepted storage server channel should shutdown the underlying managed channel Descriptions of the changes in this PR: *Motivation* When a storage server channel is intercepted, the underlying channel will not be `ManagedChannel` any more. so closing storage server channel will never close the underlying managed channel. *Changes* StorageServerChannel#intercept should keep reference to the original storage server channel. so when closing the intercepted channel, it can close the original channel. Author: Sijie Guo <si...@apache.org> Reviewers: Jia Zhai <None> This closes #1720 from sijie/close_intercepted_channel --- .../clients/impl/channel/StorageServerChannel.java | 24 +++++++++++++++++++--- .../impl/channel/TestStorageServerChannel.java | 12 +++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java index 76c7631..e5206de 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java @@ -58,6 +58,7 @@ public class StorageServerChannel implements AutoCloseable { private final Optional<String> token; private final Channel channel; + private final StorageServerChannel interceptedServerChannel; @GuardedBy("this") private RootRangeServiceFutureStub rootRangeService; @@ -87,6 +88,11 @@ public class StorageServerChannel implements AutoCloseable { resolvedEndpoint.getPort()) .usePlaintext(usePlainText) .build(); + this.interceptedServerChannel = null; + } + + public Channel getGrpcChannel() { + return channel; } @VisibleForTesting @@ -97,8 +103,15 @@ public class StorageServerChannel implements AutoCloseable { protected StorageServerChannel(Channel channel, Optional<String> token) { + this(channel, token, null); + } + + private StorageServerChannel(Channel channel, + Optional<String> token, + StorageServerChannel interceptedServerChannel) { this.token = token; this.channel = channel; + this.interceptedServerChannel = interceptedServerChannel; } public synchronized RootRangeServiceFutureStub getRootRangeService() { @@ -153,13 +166,18 @@ public class StorageServerChannel implements AutoCloseable { interceptors); return new StorageServerChannel( interceptedChannel, - this.token); + this.token, + this); } @Override public void close() { - if (channel instanceof ManagedChannel) { - ((ManagedChannel) channel).shutdown(); + if (interceptedServerChannel != null) { + interceptedServerChannel.close(); + } else { + if (channel instanceof ManagedChannel) { + ((ManagedChannel) channel).shutdown(); + } } } } diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java index 1dd5f86..e126066 100644 --- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java +++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java @@ -19,6 +19,9 @@ package org.apache.bookkeeper.clients.impl.channel; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import io.grpc.ManagedChannel; import io.grpc.Server; @@ -68,4 +71,13 @@ public class TestStorageServerChannel { channel.close(); } + @Test + public void testIntercept() { + ManagedChannel channel = mock(ManagedChannel.class); + StorageServerChannel ssChannel = new StorageServerChannel(channel, Optional.empty()); + StorageServerChannel interceptedChannel = ssChannel.intercept(1L); + interceptedChannel.close(); + verify(channel, times(1)).shutdown(); + } + }