Repository: hbase
Updated Branches:
  refs/heads/branch-1 6e3da5a39 -> 6860ddca9


HBASE-18199 Race in NettyRpcConnection may cause call stuck in 
BufferCallBeforeInitHandler forever


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6860ddca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6860ddca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6860ddca

Branch: refs/heads/branch-1
Commit: 6860ddca9f1237129f50d6920d20b832f3ded50d
Parents: 6e3da5a
Author: zhangduo <[email protected]>
Authored: Sat Jun 10 18:49:34 2017 +0800
Committer: zhangduo <[email protected]>
Committed: Sat Jun 10 19:07:23 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/NettyRpcConnection.java    | 59 ++++++++++++++------
 1 file changed, 42 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6860ddca/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 9a90b09..1b31182 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -70,8 +70,8 @@ class NettyRpcConnection extends RpcConnection {
 
   private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class);
 
-  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
-      
.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
+  private static final ScheduledExecutorService RELOGIN_EXECUTOR =
+      
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
 
   private final NettyRpcClient rpcClient;
 
@@ -88,8 +88,8 @@ class NettyRpcConnection extends RpcConnection {
         rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, 
rpcClient.compressor);
     this.rpcClient = rpcClient;
     byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
-    this.connectionHeaderPreamble = 
Unpooled.directBuffer(connectionHeaderPreamble.length)
-        .writeBytes(connectionHeaderPreamble);
+    this.connectionHeaderPreamble =
+        
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
     ConnectionHeader header = getConnectionHeader();
     this.connectionHeaderWithLength = Unpooled.directBuffer(4 + 
header.getSerializedSize());
     this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
@@ -246,9 +246,23 @@ class NettyRpcConnection extends RpcConnection {
         }).channel();
   }
 
+  private void write(Channel ch, final Call call) {
+    ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
+
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        // Fail the call if we failed to write it out. This usually because 
the channel is
+        // closed. This is needed because we may shutdown the channel inside 
event loop and
+        // there may still be some pending calls in the event loop queue after 
us.
+        if (!future.isSuccess()) {
+          call.setException(toIOE(future.cause()));
+        }
+      }
+    });
+  }
+
   @Override
-  public synchronized void sendRequest(final Call call, HBaseRpcController hrc)
-      throws IOException {
+  public synchronized void sendRequest(final Call call, HBaseRpcController 
hrc) throws IOException {
     if (reloginInProgress) {
       throw new IOException("Can not send request because relogin is in 
progress.");
     }
@@ -274,18 +288,29 @@ class NettyRpcConnection extends RpcConnection {
             connect();
           }
           scheduleTimeoutTask(call);
-          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws 
Exception {
-              // Fail the call if we failed to write it out. This usually 
because the channel is
-              // closed. This is needed because we may shutdown the channel 
inside event loop and
-              // there may still be some pending calls in the event loop queue 
after us.
-              if (!future.isSuccess()) {
-                call.setException(toIOE(future.cause()));
+          final Channel ch = channel;
+          // We must move the whole writeAndFlush call inside event loop 
otherwise there will be a
+          // race condition.
+          // In netty's DefaultChannelPipeline, it will find the first 
outbound handler in the
+          // current thread and then schedule a task to event loop which will 
start the process from
+          // that outbound handler. It is possible that the first handler is
+          // BufferCallBeforeInitHandler when we call writeAndFlush here, but 
the connection is set
+          // up at the same time so in the event loop thread we remove the
+          // BufferCallBeforeInitHandler, and then our writeAndFlush task 
comes, still calls the
+          // write method of BufferCallBeforeInitHandler.
+          // This may be considered as a bug of netty, but anyway there is a 
work around so let's
+          // fix it by ourselves first.
+          if (ch.eventLoop().inEventLoop()) {
+            write(ch, call);
+          } else {
+            ch.eventLoop().execute(new Runnable() {
+
+              @Override
+              public void run() {
+                write(ch, call);
               }
-            }
-          });
+            });
+          }
         }
       }
     });

Reply via email to