This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 580d29e  [TABLE SERVICE] Intercepted storage server channel should 
shutdown the underlying managed channel
580d29e is described below

commit 580d29e47052fad058456c5028debe75701ea3a8
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Mon Oct 1 08:17:36 2018 -0700

    [TABLE SERVICE] Intercepted storage server channel should shutdown the 
underlying managed channel
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    When a storage server channel is intercepted, the underlying channel will 
not be `ManagedChannel`
    any more. so closing storage server channel will never close the underlying 
managed channel.
    
    *Changes*
    
    StorageServerChannel#intercept should keep reference to the original 
storage server channel.
    so when closing the intercepted channel, it can close the original channel.
    
    
    
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1720 from sijie/close_intercepted_channel
---
 .../clients/impl/channel/StorageServerChannel.java | 24 +++++++++++++++++++---
 .../impl/channel/TestStorageServerChannel.java     | 12 +++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 76c7631..e5206de 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -58,6 +58,7 @@ public class StorageServerChannel implements AutoCloseable {
 
     private final Optional<String> token;
     private final Channel channel;
+    private final StorageServerChannel interceptedServerChannel;
 
     @GuardedBy("this")
     private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +88,11 @@ public class StorageServerChannel implements AutoCloseable {
             resolvedEndpoint.getPort())
             .usePlaintext(usePlainText)
             .build();
+        this.interceptedServerChannel = null;
+    }
+
+    public Channel getGrpcChannel() {
+        return channel;
     }
 
     @VisibleForTesting
@@ -97,8 +103,15 @@ public class StorageServerChannel implements AutoCloseable {
 
     protected StorageServerChannel(Channel channel,
                                    Optional<String> token) {
+        this(channel, token, null);
+    }
+
+    private StorageServerChannel(Channel channel,
+                                 Optional<String> token,
+                                 StorageServerChannel 
interceptedServerChannel) {
         this.token = token;
         this.channel = channel;
+        this.interceptedServerChannel = interceptedServerChannel;
     }
 
     public synchronized RootRangeServiceFutureStub getRootRangeService() {
@@ -153,13 +166,18 @@ public class StorageServerChannel implements 
AutoCloseable {
             interceptors);
         return new StorageServerChannel(
             interceptedChannel,
-            this.token);
+            this.token,
+            this);
     }
 
     @Override
     public void close() {
-        if (channel instanceof ManagedChannel) {
-            ((ManagedChannel) channel).shutdown();
+        if (interceptedServerChannel != null) {
+            interceptedServerChannel.close();
+        } else {
+            if (channel instanceof ManagedChannel) {
+                ((ManagedChannel) channel).shutdown();
+            }
         }
     }
 }
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
index 1dd5f86..e126066 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
@@ -19,6 +19,9 @@
 package org.apache.bookkeeper.clients.impl.channel;
 
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import io.grpc.ManagedChannel;
 import io.grpc.Server;
@@ -68,4 +71,13 @@ public class TestStorageServerChannel {
         channel.close();
     }
 
+    @Test
+    public void testIntercept() {
+        ManagedChannel channel = mock(ManagedChannel.class);
+        StorageServerChannel ssChannel = new StorageServerChannel(channel, 
Optional.empty());
+        StorageServerChannel interceptedChannel = ssChannel.intercept(1L);
+        interceptedChannel.close();
+        verify(channel, times(1)).shutdown();
+    }
+
 }

Reply via email to