Repository: hbase Updated Branches: refs/heads/branch-1.2 5c82c8236 -> 71bf5afa3
HBASE-18081 The way we process connection preamble in SimpleRpcServer is broken Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/71bf5afa Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/71bf5afa Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/71bf5afa Branch: refs/heads/branch-1.2 Commit: 71bf5afa37343245017ab755a59d9fc3fe3aa6ce Parents: 5c82c82 Author: zhangduo <zhang...@apache.org> Authored: Sat May 20 21:58:45 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Sun May 21 22:03:58 2017 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 48 ++++---- .../hadoop/hbase/ipc/AbstractTestIPC.java | 4 +- .../ipc/TestRpcServerSlowConnectionSetup.java | 116 +++++++++++++++++++ 3 files changed, 146 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/71bf5afa/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2fde8a8..45b0c18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1192,6 +1192,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; + private ByteBuffer preambleBuffer; protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>(); private final Lock responseWriteLock = new ReentrantLock(); private Counter rpcCount = new Counter(); // number of outstanding rpcs @@ -1480,23 +1481,25 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } private int readPreamble() throws IOException { - int count; - // Check for 'HBas' magic. - this.dataLengthBuffer.flip(); - if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) { - return doBadPreambleHandling("Expected HEADER=" + - Bytes.toStringBinary(HConstants.RPC_HEADER) + - " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) + - " from " + toString()); - } - // Now read the next two bytes, the version and the auth to use. - ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2); - count = channelRead(channel, versionAndAuthBytes); - if (count < 0 || versionAndAuthBytes.remaining() > 0) { + if (preambleBuffer == null) { + preambleBuffer = ByteBuffer.allocate(6); + } + int count = channelRead(channel, preambleBuffer); + if (count < 0 || preambleBuffer.remaining() > 0) { return count; } - int version = versionAndAuthBytes.get(0); - byte authbyte = versionAndAuthBytes.get(1); + // Check for 'HBas' magic. + preambleBuffer.flip(); + for (int i = 0; i < HConstants.RPC_HEADER.length; i++) { + if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) { + return doBadPreambleHandling("Expected HEADER=" + + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" + + Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) + + " from " + toString()); + } + } + int version = preambleBuffer.get(HConstants.RPC_HEADER.length); + byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1); this.authMethod = AuthMethod.valueOf(authbyte); if (version != CURRENT_VERSION) { String msg = getFatalConnectionString(version, authbyte); @@ -1530,7 +1533,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { useSasl = true; } - dataLengthBuffer.clear(); + preambleBuffer = null; // do not need it anymore connectionPreambleRead = true; return count; } @@ -1552,10 +1555,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws InterruptedException */ public int readAndProcess() throws IOException, InterruptedException { - // Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it - // does, read in the rest of the connection preamble, the version and the auth method. - // Else it will be length of the data to read (or -1 if a ping). We catch the integer - // length into the 4-byte this.dataLengthBuffer. + // If we have not read the connection setup preamble, look to see if that is on the wire. + if (!connectionPreambleRead) { + int count = readPreamble(); + if (!connectionPreambleRead) { + return count; + } + } + // Try and read in an int. It will be length of the data to read (or -1 if a ping). We catch + // the integer length into the 4-byte this.dataLengthBuffer. int count = read4Bytes(); if (count < 0 || dataLengthBuffer.remaining() > 0) { return count; http://git-wip-us.apache.org/repos/asf/hbase/blob/71bf5afa/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index ffe4d40..ab0fcce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -92,13 +92,13 @@ public abstract class AbstractTestIPC { @Override public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) throws ServiceException { - return null; + return EmptyResponseProto.getDefaultInstance(); } @Override public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) throws ServiceException { - return null; + return EmptyResponseProto.getDefaultInstance(); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/71bf5afa/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java new file mode 100644 index 0000000..548cc4b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.AbstractTestIPC.SERVICE; +import static org.junit.Assert.assertEquals; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category({ RPCTests.class, MediumTests.class }) +public class TestRpcServerSlowConnectionSetup { + + private RpcServer server; + + private Socket socket; + + @Before + public void setUp() throws IOException { + Configuration conf = HBaseConfiguration.create(); + server = new RpcServer(null, "testRpcServer", + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1)); + server.start(); + socket = new Socket("localhost", server.getListenerAddress().getPort()); + } + + @After + public void tearDown() throws IOException { + if (socket != null) { + socket.close(); + } + if (server != null) { + server.stop(); + } + } + + @Test + public void test() throws IOException, InterruptedException { + int rpcHeaderLen = HConstants.RPC_HEADER.length; + byte[] preamble = new byte[rpcHeaderLen + 2]; + System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen); + preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; + preamble[rpcHeaderLen + 1] = AuthMethod.SIMPLE.code; + socket.getOutputStream().write(preamble, 0, rpcHeaderLen + 1); + socket.getOutputStream().flush(); + Thread.sleep(5000); + socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1); + socket.getOutputStream().flush(); + + ConnectionHeader header = ConnectionHeader.newBuilder() + .setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName()) + .setVersionInfo(ProtobufUtil.getVersionInfo()).build(); + DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); + dos.writeInt(header.getSerializedSize()); + header.writeTo(dos); + dos.flush(); + + int callId = 10; + RequestHeader requestHeader = RequestHeader.newBuilder().setCallId(callId).setMethodName("ping") + .setRequestParam(true).build(); + dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, + EmptyRequestProto.getDefaultInstance())); + requestHeader.writeDelimitedTo(dos); + EmptyRequestProto.getDefaultInstance().writeDelimitedTo(dos); + dos.flush(); + + DataInputStream dis = new DataInputStream(new BufferedInputStream(socket.getInputStream())); + int size = dis.readInt(); + ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(dis); + assertEquals(callId, responseHeader.getCallId()); + EmptyResponseProto.Builder builder = EmptyResponseProto.newBuilder(); + builder.mergeDelimitedFrom(dis); + assertEquals(size, IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader, builder.build())); + } +} \ No newline at end of file