Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 e9aa49f65 -> 9d21e89b0


HBASE-18081 The way we process connection preamble in SimpleRpcServer is broken


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9d21e89b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9d21e89b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9d21e89b

Branch: refs/heads/branch-1.3
Commit: 9d21e89b003243d6dca935659ab991d29486e423
Parents: e9aa49f
Author: zhangduo <zhang...@apache.org>
Authored: Sat May 20 21:50:21 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Sun May 21 21:15:26 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  48 ++++----
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |   4 +-
 .../ipc/TestRpcServerSlowConnectionSetup.java   | 116 +++++++++++++++++++
 3 files changed, 146 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9d21e89b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index c29c395..6b7ccff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1270,6 +1270,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     protected SocketChannel channel;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
+    private ByteBuffer preambleBuffer;
     protected final ConcurrentLinkedDeque<Call> responseQueue = new 
ConcurrentLinkedDeque<Call>();
     private final Lock responseWriteLock = new ReentrantLock();
     private Counter rpcCount = new Counter(); // number of outstanding rpcs
@@ -1560,23 +1561,25 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     }
 
     private int readPreamble() throws IOException {
-      int count;
-      // Check for 'HBas' magic.
-      this.dataLengthBuffer.flip();
-      if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
-        return doBadPreambleHandling("Expected HEADER=" +
-            Bytes.toStringBinary(HConstants.RPC_HEADER) +
-            " but received HEADER=" + 
Bytes.toStringBinary(dataLengthBuffer.array()) +
-            " from " + toString());
-      }
-      // Now read the next two bytes, the version and the auth to use.
-      ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
-      count = channelRead(channel, versionAndAuthBytes);
-      if (count < 0 || versionAndAuthBytes.remaining() > 0) {
+      if (preambleBuffer == null) {
+        preambleBuffer = ByteBuffer.allocate(6);
+      }
+      int count = channelRead(channel, preambleBuffer);
+      if (count < 0 || preambleBuffer.remaining() > 0) {
         return count;
       }
-      int version = versionAndAuthBytes.get(0);
-      byte authbyte = versionAndAuthBytes.get(1);
+      // Check for 'HBas' magic.
+      preambleBuffer.flip();
+      for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
+        if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
+          return doBadPreambleHandling("Expected HEADER=" +
+              Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received 
HEADER=" +
+              Bytes.toStringBinary(preambleBuffer.array(), 0, 
HConstants.RPC_HEADER.length) +
+              " from " + toString());
+        }
+      }
+      int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
+      byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
       this.authMethod = AuthMethod.valueOf(authbyte);
       if (version != CURRENT_VERSION) {
         String msg = getFatalConnectionString(version, authbyte);
@@ -1610,7 +1613,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
         useSasl = true;
       }
 
-      dataLengthBuffer.clear();
+      preambleBuffer = null; // do not need it anymore
       connectionPreambleRead = true;
       return count;
     }
@@ -1632,10 +1635,15 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
      * @throws InterruptedException
      */
     public int readAndProcess() throws IOException, InterruptedException {
-      // Try and read in an int.  If new connection, the int will hold the 
'HBas' HEADER.  If it
-      // does, read in the rest of the connection preamble, the version and 
the auth method.
-      // Else it will be length of the data to read (or -1 if a ping).  We 
catch the integer
-      // length into the 4-byte this.dataLengthBuffer.
+      // If we have not read the connection setup preamble, look to see if 
that is on the wire.
+      if (!connectionPreambleRead) {
+        int count = readPreamble();
+        if (!connectionPreambleRead) {
+          return count;
+        }
+      }
+      // Try and read in an int. It will be length of the data to read (or -1 
if a ping). We catch
+      // the integer length into the 4-byte this.dataLengthBuffer.
       int count = read4Bytes();
       if (count < 0 || dataLengthBuffer.remaining() > 0) {
         return count;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d21e89b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index bbf8720..aea0feb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -92,13 +92,13 @@ public abstract class AbstractTestIPC {
             @Override
             public EmptyResponseProto ping(RpcController controller, 
EmptyRequestProto request)
                 throws ServiceException {
-              return null;
+              return EmptyResponseProto.getDefaultInstance();
             }
 
             @Override
             public EmptyResponseProto error(RpcController controller, 
EmptyRequestProto request)
                 throws ServiceException {
-              return null;
+              return EmptyResponseProto.getDefaultInstance();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d21e89b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
new file mode 100644
index 0000000..7695423
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.AbstractTestIPC.SERVICE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category({ RPCTests.class, MediumTests.class })
+public class TestRpcServerSlowConnectionSetup {
+
+  private RpcServer server;
+
+  private Socket socket;
+
+  @Before
+  public void setUp() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    server = new RpcServer(null, "testRpcServer",
+        Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+        new InetSocketAddress("localhost", 0), conf, new 
FifoRpcScheduler(conf, 1));
+    server.start();
+    socket = new Socket("localhost", server.getListenerAddress().getPort());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (socket != null) {
+      socket.close();
+    }
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    int rpcHeaderLen = HConstants.RPC_HEADER.length;
+    byte[] preamble = new byte[rpcHeaderLen + 2];
+    System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
+    preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
+    preamble[rpcHeaderLen + 1] = AuthMethod.SIMPLE.code;
+    socket.getOutputStream().write(preamble, 0, rpcHeaderLen + 1);
+    socket.getOutputStream().flush();
+    Thread.sleep(5000);
+    socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
+    socket.getOutputStream().flush();
+
+    ConnectionHeader header = ConnectionHeader.newBuilder()
+        
.setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName())
+        .setVersionInfo(ProtobufUtil.getVersionInfo()).build();
+    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
+    dos.writeInt(header.getSerializedSize());
+    header.writeTo(dos);
+    dos.flush();
+
+    int callId = 10;
+    RequestHeader requestHeader = 
RequestHeader.newBuilder().setCallId(callId).setMethodName("ping")
+        .setRequestParam(true).setTimeout(1000).build();
+    dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader,
+      EmptyRequestProto.getDefaultInstance()));
+    requestHeader.writeDelimitedTo(dos);
+    EmptyRequestProto.getDefaultInstance().writeDelimitedTo(dos);
+    dos.flush();
+
+    DataInputStream dis = new DataInputStream(new 
BufferedInputStream(socket.getInputStream()));
+    int size = dis.readInt();
+    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(dis);
+    assertEquals(callId, responseHeader.getCallId());
+    EmptyResponseProto.Builder builder = EmptyResponseProto.newBuilder();
+    builder.mergeDelimitedFrom(dis);
+    assertEquals(size, 
IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader, builder.build()));
+  }
+}
\ No newline at end of file

Reply via email to