http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index c238adb..4fa58ad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -17,300 +17,173 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.DataInput;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.BufferOverflowException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.CodedOutputStream;
-import com.google.protobuf.Message;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * Utility to help ipc'ing.
  */
 @InterfaceAudience.Private
-public class IPCUtil {
-  // LOG is being used in TestIPCUtil
-  public static final Log LOG = LogFactory.getLog(IPCUtil.class);
-  /**
-   * How much we think the decompressor will expand the original compressed 
content.
-   */
-  private final int cellBlockDecompressionMultiplier;
-  private final int cellBlockBuildingInitialBufferSize;
-  private final Configuration conf;
-
-  public IPCUtil(final Configuration conf) {
-    super();
-    this.conf = conf;
-    this.cellBlockDecompressionMultiplier =
-        conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 
3);
-
-    // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See 
the TODO below in
-    // #buildCellBlock.
-    this.cellBlockBuildingInitialBufferSize =
-      
ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 
16 * 1024));
-  }
+class IPCUtil {
 
   /**
-   * Thrown if a cellscanner but no codec to encode it with.
-   */
-  public static class CellScannerButNoCodecException extends HBaseIOException 
{};
-
-  /**
-   * Puts CellScanner Cells into a cell block using passed in 
<code>codec</code> and/or
-   * <code>compressor</code>.
-   * @param codec
-   * @param compressor
-   * @param cellScanner
-   * @return Null or byte buffer filled with a cellblock filled with passed-in 
Cells encoded using
-   * passed in <code>codec</code> and/or <code>compressor</code>; the returned 
buffer has been
-   * flipped and is ready for reading.  Use limit to find total size.
-   * @throws IOException
+   * Write out header, param, and cell block if there is one.
+   * @param dos Stream to write into
+   * @param header to write
+   * @param param to write
+   * @param cellBlock to write
+   * @return Total number of bytes written.
+   * @throws IOException if write action fails
    */
-  @SuppressWarnings("resource")
-  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec 
compressor,
-    final CellScanner cellScanner)
-  throws IOException {
-    return buildCellBlock(codec, compressor, cellScanner, null);
+  public static int write(final OutputStream dos, final Message header, final 
Message param,
+      final ByteBuffer cellBlock) throws IOException {
+    // Must calculate total size and write that first so other side can read 
it all in in one
+    // swoop. This is dictated by how the server is currently written. Server 
needs to change
+    // if we are to be able to write without the length prefixing.
+    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
+    if (cellBlock != null) {
+      totalSize += cellBlock.remaining();
+    }
+    return write(dos, header, param, cellBlock, totalSize);
   }
 
-  /**
-   * Puts CellScanner Cells into a cell block using passed in 
<code>codec</code> and/or
-   * <code>compressor</code>.
-   * @param codec
-   * @param compressor
-   * @param cellScanner
-   * @param pool Pool of ByteBuffers to make use of. Can be null and then 
we'll allocate
-   * our own ByteBuffer.
-   * @return Null or byte buffer filled with a cellblock filled with passed-in 
Cells encoded using
-   * passed in <code>codec</code> and/or <code>compressor</code>; the returned 
buffer has been
-   * flipped and is ready for reading.  Use limit to find total size. If 
<code>pool</code> was not
-   * null, then this returned ByteBuffer came from there and should be 
returned to the pool when
-   * done.
-   * @throws IOException
-   */
-  @SuppressWarnings("resource")
-  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec 
compressor,
-    final CellScanner cellScanner, final BoundedByteBufferPool pool)
-  throws IOException {
-    if (cellScanner == null) return null;
-    if (codec == null) throw new CellScannerButNoCodecException();
-    int bufferSize = this.cellBlockBuildingInitialBufferSize;
-    ByteBufferOutputStream baos = null;
-    ByteBuffer bb = null;
-    if (pool != null) {
-      bb = pool.getBuffer();
-      bufferSize = bb.capacity();
-      baos = new ByteBufferOutputStream(bb);
-    } else {
-      // Then we need to make our own to return.
-      if (cellScanner instanceof HeapSize) {
-        long longSize = ((HeapSize)cellScanner).heapSize();
-        // Just make sure we don't have a size bigger than an int.
-        if (longSize > Integer.MAX_VALUE) {
-          throw new IOException("Size " + longSize + " > " + 
Integer.MAX_VALUE);
-        }
-        bufferSize = ClassSize.align((int)longSize);
-      }
-      baos = new ByteBufferOutputStream(bufferSize);
-    }
-    OutputStream os = baos;
-    Compressor poolCompressor = null;
-    try {
-      if (compressor != null) {
-        if (compressor instanceof Configurable) 
((Configurable)compressor).setConf(this.conf);
-        poolCompressor = CodecPool.getCompressor(compressor);
-        os = compressor.createOutputStream(os, poolCompressor);
-      }
-      Codec.Encoder encoder = codec.getEncoder(os);
-      int count = 0;
-      while (cellScanner.advance()) {
-        encoder.write(cellScanner.current());
-        count++;
-      }
-      encoder.flush();
-      // If no cells, don't mess around.  Just return null (could be a bunch 
of existence checking
-      // gets or something -- stuff that does not return a cell).
-      if (count == 0) {
-        if (pool != null && bb != null) {
-          pool.putBuffer(bb);
-        }
-        return null;
-      }
-    } catch (BufferOverflowException e) {
-      throw new DoNotRetryIOException(e);
-    } finally {
-      os.close();
-      if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
+  private static int write(final OutputStream dos, final Message header, final 
Message param,
+      final ByteBuffer cellBlock, final int totalSize) throws IOException {
+    // I confirmed toBytes does same as DataOutputStream#writeInt.
+    dos.write(Bytes.toBytes(totalSize));
+    // This allocates a buffer that is the size of the message internally.
+    header.writeDelimitedTo(dos);
+    if (param != null) {
+      param.writeDelimitedTo(dos);
     }
-    if (LOG.isTraceEnabled()) {
-      if (bufferSize < baos.size()) {
-        LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " 
+ baos.size() +
-          "; up hbase.ipc.cellblock.building.initial.buffersize?");
-      }
+    if (cellBlock != null) {
+      dos.write(cellBlock.array(), 0, cellBlock.remaining());
     }
-    return baos.getByteBuffer();
+    dos.flush();
+    return totalSize;
   }
 
   /**
-   * @param codec
-   * @param cellBlock
-   * @return CellScanner to work against the content of <code>cellBlock</code>
-   * @throws IOException
+   * @return Size on the wire when the two messages are written with 
writeDelimitedTo
    */
-  public CellScanner createCellScanner(final Codec codec, final 
CompressionCodec compressor,
-      final byte [] cellBlock)
-  throws IOException {
-    return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
+  public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
+    int totalSize = 0;
+    for (Message m : messages) {
+      if (m == null) {
+        continue;
+      }
+      totalSize += m.getSerializedSize();
+      totalSize += 
CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
+    }
+    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
+    return totalSize;
   }
 
-  /**
-   * @param codec
-   * @param cellBlock ByteBuffer containing the cells written by the Codec. 
The buffer should be
-   * position()'ed at the start of the cell block and limit()'ed at the end.
-   * @return CellScanner to work against the content of <code>cellBlock</code>
-   * @throws IOException
-   */
-  public CellScanner createCellScanner(final Codec codec, final 
CompressionCodec compressor,
-      final ByteBuffer cellBlock)
-  throws IOException {
-    // If compressed, decompress it first before passing it on else we will 
leak compression
-    // resources if the stream is not closed properly after we let it out.
-    InputStream is = null;
-    if (compressor != null) {
-      // GZIPCodec fails w/ NPE if no configuration.
-      if (compressor instanceof Configurable) 
((Configurable)compressor).setConf(this.conf);
-      Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
-      CompressionInputStream cis =
-        compressor.createInputStream(new ByteBufferInputStream(cellBlock), 
poolDecompressor);
-      ByteBufferOutputStream bbos = null;
-      try {
-        // TODO: This is ugly.  The buffer will be resized on us if we guess 
wrong.
-        // TODO: Reuse buffers.
-        bbos = new ByteBufferOutputStream(cellBlock.remaining() *
-          this.cellBlockDecompressionMultiplier);
-        IOUtils.copy(cis, bbos);
-        bbos.close();
-        ByteBuffer bb = bbos.getByteBuffer();
-        is = new ByteBufferInputStream(bb);
-      } finally {
-        if (is != null) is.close();
-        if (bbos != null) bbos.close();
-
-        CodecPool.returnDecompressor(poolDecompressor);
-      }
-    } else {
-      is = new ByteBufferInputStream(cellBlock);
+  static RequestHeader buildRequestHeader(Call call, CellBlockMeta 
cellBlockMeta) {
+    RequestHeader.Builder builder = RequestHeader.newBuilder();
+    builder.setCallId(call.id);
+    if (call.span != null) {
+      
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
+          .setTraceId(call.span.getTraceId()));
+    }
+    builder.setMethodName(call.md.getName());
+    builder.setRequestParam(call.param != null);
+    if (cellBlockMeta != null) {
+      builder.setCellBlockMeta(cellBlockMeta);
+    }
+    // Only pass priority if there is one set.
+    if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+      builder.setPriority(call.priority);
     }
-    return codec.getDecoder(is);
+    builder.setTimeout(call.timeout);
+
+    return builder.build();
   }
 
   /**
-   * @param m Message to serialize delimited; i.e. w/ a vint of its size 
preceeding its
-   * serialization.
-   * @return The passed in Message serialized with delimiter.  Return null if 
<code>m</code> is null
-   * @throws IOException
+   * @param e exception to be wrapped
+   * @return RemoteException made from passed <code>e</code>
    */
-  public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) 
throws IOException {
-    if (m == null) return null;
-    int serializedSize = m.getSerializedSize();
-    int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
-    byte [] buffer = new byte[serializedSize + vintSize];
-    // Passing in a byte array saves COS creating a buffer which it does when 
using streams.
-    CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
-    // This will write out the vint preamble and the message serialized.
-    cos.writeMessageNoTag(m);
-    cos.flush();
-    cos.checkNoSpaceLeft();
-    return ByteBuffer.wrap(buffer);
+  static RemoteException createRemoteException(final ExceptionResponse e) {
+    String innerExceptionClassName = e.getExceptionClassName();
+    boolean doNotRetry = e.getDoNotRetry();
+    return e.hasHostname() ?
+    // If a hostname then add it to the RemoteWithExtrasException
+        new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), e.getHostname(),
+            e.getPort(), doNotRetry)
+        : new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), doNotRetry);
   }
 
   /**
-   * Write out header, param, and cell block if there is one.
-   * @param dos
-   * @param header
-   * @param param
-   * @param cellBlock
-   * @return Total number of bytes written.
-   * @throws IOException
+   * @return True if the exception is a fatal connection exception.
    */
-  public static int write(final OutputStream dos, final Message header, final 
Message param,
-      final ByteBuffer cellBlock)
-  throws IOException {
-    // Must calculate total size and write that first so other side can read 
it all in in one
-    // swoop.  This is dictated by how the server is currently written.  
Server needs to change
-    // if we are to be able to write without the length prefixing.
-    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
-    if (cellBlock != null) totalSize += cellBlock.remaining();
-    return write(dos, header, param, cellBlock, totalSize);
+  static boolean isFatalConnectionException(final ExceptionResponse e) {
+    return 
e.getExceptionClassName().equals(FatalConnectionException.class.getName());
   }
 
-  private static int write(final OutputStream dos, final Message header, final 
Message param,
-    final ByteBuffer cellBlock, final int totalSize)
-  throws IOException {
-    // I confirmed toBytes does same as DataOutputStream#writeInt.
-    dos.write(Bytes.toBytes(totalSize));
-    // This allocates a buffer that is the size of the message internally.
-    header.writeDelimitedTo(dos);
-    if (param != null) param.writeDelimitedTo(dos);
-    if (cellBlock != null) dos.write(cellBlock.array(), 0, 
cellBlock.remaining());
-    dos.flush();
-    return totalSize;
+  static IOException toIOE(Throwable t) {
+    if (t instanceof IOException) {
+      return (IOException) t;
+    } else {
+      return new IOException(t);
+    }
   }
 
   /**
-   * Read in chunks of 8K (HBASE-7239)
-   * @param in
-   * @param dest
-   * @param offset
-   * @param len
-   * @throws IOException
+   * Takes an Exception and the address we were trying to connect to and 
return an IOException with
+   * the input exception as the cause. The new exception provides the stack 
trace of the place where
+   * the exception is thrown and some extra diagnostics information. If the 
exception is
+   * ConnectException or SocketTimeoutException, return a new one of the same 
type; Otherwise return
+   * an IOException.
+   * @param addr target address
+   * @param exception the relevant exception
+   * @return an exception to throw
    */
-  public static void readChunked(final DataInput in, byte[] dest, int offset, 
int len)
-      throws IOException {
-    int maxRead = 8192;
-
-    for (; offset < len; offset += maxRead) {
-      in.readFully(dest, offset, Math.min(len - offset, maxRead));
+  static IOException wrapException(InetSocketAddress addr, Exception 
exception) {
+    if (exception instanceof ConnectException) {
+      // connection refused; include the host:port in the error
+      return (ConnectException) new ConnectException(
+          "Call to " + addr + " failed on connection exception: " + 
exception).initCause(exception);
+    } else if (exception instanceof SocketTimeoutException) {
+      return (SocketTimeoutException) new SocketTimeoutException(
+          "Call to " + addr + " failed because " + 
exception).initCause(exception);
+    } else if (exception instanceof ConnectionClosingException) {
+      return (ConnectionClosingException) new ConnectionClosingException(
+          "Call to " + addr + " failed on local exception: " + 
exception).initCause(exception);
+    } else if (exception instanceof ServerTooBusyException) {
+      // we already have address in the exception message
+      return (IOException) exception;
+    } else if (exception instanceof DoNotRetryIOException) {
+      return (IOException) new DoNotRetryIOException(
+          "Call to " + addr + " failed on local exception: " + 
exception).initCause(exception);
+    } else {
+      return (IOException) new IOException(
+          "Call to " + addr + " failed on local exception: " + 
exception).initCause(exception);
     }
   }
 
-  /**
-   * @return Size on the wire when the two messages are written with 
writeDelimitedTo
-   */
-  public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
-    int totalSize = 0;
-    for (Message m: messages) {
-      if (m == null) continue;
-      totalSize += m.getSerializedSize();
-      totalSize += 
CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
-    }
-    Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
-    return totalSize;
+  static void setCancelled(Call call) {
+    call.setException(new CallCancelledException("Call id=" + call.id + ", 
waitTime=" +
+        (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", 
rpcTimeout=" +
+        call.timeout));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index 7bb27e9..de2c8de 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -18,23 +18,21 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-
 /**
  * Provides clients with an RPC connection to call coprocessor endpoint {@link 
com.google.protobuf.Service}s
  * against the active master.  An instance of this class may be obtained

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
new file mode 100644
index 0000000..cde453f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Netty client for the requests and responses.
+ */
[email protected](HBaseInterfaceAudience.CONFIG)
+public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
+
+  final EventLoopGroup group;
+
+  final Class<? extends Channel> channelClass;
+
+  private final boolean shutdownGroupWhenClose;
+
+  public NettyRpcClient(Configuration configuration, String clusterId, 
SocketAddress localAddress,
+      MetricsConnection metrics) {
+    super(configuration, clusterId, localAddress, metrics);
+    Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass = 
NettyRpcClientConfigHelper
+        .getEventLoopConfig(conf);
+    if (groupAndChannelClass == null) {
+      // Use our own EventLoopGroup.
+      this.group = new NioEventLoopGroup(0,
+          new DefaultThreadFactory("IPC-NioEventLoopGroup", true, 
Thread.MAX_PRIORITY));
+      this.channelClass = NioSocketChannel.class;
+      this.shutdownGroupWhenClose = true;
+    } else {
+      this.group = groupAndChannelClass.getFirst();
+      this.channelClass = groupAndChannelClass.getSecond();
+      this.shutdownGroupWhenClose = false;
+    }
+  }
+
+  /** Used in test only. */
+  NettyRpcClient(Configuration configuration) {
+    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
+  }
+
+  @Override
+  protected NettyRpcConnection createConnection(ConnectionId remoteId) throws 
IOException {
+    return new NettyRpcConnection(this, remoteId);
+  }
+
+  @Override
+  protected void closeInternal() {
+    if (shutdownGroupWhenClose) {
+      group.shutdownGracefully();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
new file mode 100644
index 0000000..a8af69c
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Helper class for passing config to {@link NettyRpcClient}.
+ * <p>
+ * As hadoop Configuration can not pass an Object directly, we need to find a 
way to pass the
+ * EventLoopGroup to {@code AsyncRpcClient} if we want to use a single {@code 
EventLoopGroup} for
+ * the whole process.
+ */
[email protected]
[email protected]
+public class NettyRpcClientConfigHelper {
+
+  public static final String EVENT_LOOP_CONFIG = 
"hbase.rpc.client.event-loop.config";
+
+  private static final String CONFIG_NAME = "global-event-loop";
+
+  private static final Map<String, Pair<EventLoopGroup, Class<? extends 
Channel>>>
+    EVENT_LOOP_CONFIG_MAP = new HashMap<>();
+
+  /**
+   * Set the EventLoopGroup and channel class for {@code AsyncRpcClient}.
+   */
+  public static void setEventLoopConfig(Configuration conf, EventLoopGroup 
group,
+      Class<? extends Channel> channelClass) {
+    Preconditions.checkNotNull(group, "group is null");
+    Preconditions.checkNotNull(channelClass, "channel class is null");
+    conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME);
+    EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME,
+      Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, 
channelClass));
+  }
+
+  /**
+   * The {@code AsyncRpcClient} will create its own {@code NioEventLoopGroup}.
+   */
+  public static void createEventLoopPerClient(Configuration conf) {
+    conf.set(EVENT_LOOP_CONFIG, "");
+    EVENT_LOOP_CONFIG_MAP.clear();
+  }
+
+  static Pair<EventLoopGroup, Class<? extends Channel>> 
getEventLoopConfig(Configuration conf) {
+    String name = conf.get(EVENT_LOOP_CONFIG);
+    if (name == null) {
+      return DefaultNettyEventLoopConfig.GROUP_AND_CHANNEL_CLASS;
+    }
+    if (StringUtils.isBlank(name)) {
+      return null;
+    }
+    return EVENT_LOOP_CONFIG_MAP.get(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
new file mode 100644
index 0000000..9a90b09
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
+import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
+
+import com.google.protobuf.RpcCallback;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
+import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
+import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * RPC connection implementation based on netty.
+ * <p>
+ * Most operations are executed in handlers. Netty handler is always executed 
in the same
+ * thread(EventLoop) so no lock is needed.
+ */
[email protected]
+class NettyRpcConnection extends RpcConnection {
+
+  private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class);
+
+  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
+      
.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
+
+  private final NettyRpcClient rpcClient;
+
+  private ByteBuf connectionHeaderPreamble;
+
+  private ByteBuf connectionHeaderWithLength;
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"IS2_INCONSISTENT_SYNC",
+      justification = "connect is also under lock as notifyOnCancel will call 
our action directly")
+  private Channel channel;
+
+  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws 
IOException {
+    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, 
rpcClient.clusterId,
+        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, 
rpcClient.compressor);
+    this.rpcClient = rpcClient;
+    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
+    this.connectionHeaderPreamble = 
Unpooled.directBuffer(connectionHeaderPreamble.length)
+        .writeBytes(connectionHeaderPreamble);
+    ConnectionHeader header = getConnectionHeader();
+    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + 
header.getSerializedSize());
+    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
+    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
+  }
+
+  @Override
+  protected synchronized void callTimeout(Call call) {
+    if (channel != null) {
+      channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
+    }
+  }
+
+  @Override
+  public synchronized boolean isActive() {
+    return channel != null;
+  }
+
+  private void shutdown0() {
+    if (channel != null) {
+      channel.close();
+      channel = null;
+    }
+  }
+
+  @Override
+  public synchronized void shutdown() {
+    shutdown0();
+  }
+
+  @Override
+  public synchronized void cleanupConnection() {
+    if (connectionHeaderPreamble != null) {
+      ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
+    }
+    if (connectionHeaderWithLength != null) {
+      ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
+    }
+  }
+
+  private void established(Channel ch) {
+    ch.write(connectionHeaderWithLength.retainedDuplicate());
+    ChannelPipeline p = ch.pipeline();
+    String addBeforeHandler = 
p.context(BufferCallBeforeInitHandler.class).name();
+    p.addBefore(addBeforeHandler, null,
+      new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, 
TimeUnit.MILLISECONDS));
+    p.addBefore(addBeforeHandler, null, new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
+    p.addBefore(addBeforeHandler, null,
+      new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, 
compressor));
+    p.fireUserEventTriggered(BufferCallEvent.success());
+  }
+
+  private boolean reloginInProgress;
+
+  private void scheduleRelogin(Throwable error) {
+    if (error instanceof FallbackDisallowedException) {
+      return;
+    }
+    synchronized (this) {
+      if (reloginInProgress) {
+        return;
+      }
+      reloginInProgress = true;
+      RELOGIN_EXECUTOR.schedule(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            if (shouldAuthenticateOverKrb()) {
+              relogin();
+            }
+          } catch (IOException e) {
+            LOG.warn("relogin failed", e);
+          }
+          synchronized (this) {
+            reloginInProgress = false;
+          }
+        }
+      }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), 
TimeUnit.MILLISECONDS);
+    }
+  }
+
+  private void failInit(Channel ch, IOException e) {
+    synchronized (this) {
+      // fail all pending calls
+      ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
+      shutdown0();
+      return;
+    }
+  }
+
+  private void saslNegotiate(final Channel ch) {
+    UserGroupInformation ticket = getUGI();
+    if (ticket == null) {
+      failInit(ch, new FatalConnectionException("ticket/user is null"));
+      return;
+    }
+    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
+    ChannelHandler saslHandler;
+    try {
+      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, 
authMethod, token,
+          serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get(
+            "hbase.rpc.protection", 
QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
+    } catch (IOException e) {
+      failInit(ch, e);
+      return;
+    }
+    ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
+    saslPromise.addListener(new FutureListener<Boolean>() {
+
+      @Override
+      public void operationComplete(Future<Boolean> future) throws Exception {
+        if (future.isSuccess()) {
+          ChannelPipeline p = ch.pipeline();
+          p.remove(SaslChallengeDecoder.class);
+          p.remove(NettyHBaseSaslRpcClientHandler.class);
+          established(ch);
+        } else {
+          final Throwable error = future.cause();
+          scheduleRelogin(error);
+          failInit(ch, toIOE(error));
+        }
+      }
+    });
+  }
+
+  private void connect() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to " + remoteId.address);
+    }
+
+    this.channel = new 
Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass)
+        .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
+        .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
+        .handler(new 
BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
+        .remoteAddress(remoteId.address).connect().addListener(new 
ChannelFutureListener() {
+
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception 
{
+            Channel ch = future.channel();
+            if (!future.isSuccess()) {
+              failInit(ch, toIOE(future.cause()));
+              rpcClient.failedServers.addToFailedServers(remoteId.address);
+              return;
+            }
+            ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
+            if (useSasl) {
+              saslNegotiate(ch);
+            } else {
+              established(ch);
+            }
+          }
+        }).channel();
+  }
+
+  @Override
+  public synchronized void sendRequest(final Call call, HBaseRpcController hrc)
+      throws IOException {
+    if (reloginInProgress) {
+      throw new IOException("Can not send request because relogin is in 
progress.");
+    }
+    hrc.notifyOnCancel(new RpcCallback<Object>() {
+
+      @Override
+      public void run(Object parameter) {
+        setCancelled(call);
+        synchronized (this) {
+          if (channel != null) {
+            channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, 
call));
+          }
+        }
+      }
+    }, new CancellationCallback() {
+
+      @Override
+      public void run(boolean cancelled) throws IOException {
+        if (cancelled) {
+          setCancelled(call);
+        } else {
+          if (channel == null) {
+            connect();
+          }
+          scheduleTimeoutTask(call);
+          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+              // Fail the call if we failed to write it out. This usually 
because the channel is
+              // closed. This is needed because we may shutdown the channel 
inside event loop and
+              // there may still be some pending calls in the event loop queue 
after us.
+              if (!future.isSuccess()) {
+                call.setException(toIOE(future.cause()));
+              }
+            }
+          });
+        }
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
new file mode 100644
index 0000000..5faaede
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.TextFormat;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.concurrent.PromiseCombiner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * The netty rpc handler.
+ */
[email protected]
+class NettyRpcDuplexHandler extends ChannelDuplexHandler {
+
+  private static final Log LOG = 
LogFactory.getLog(NettyRpcDuplexHandler.class);
+
+  private final NettyRpcConnection conn;
+
+  private final CellBlockBuilder cellBlockBuilder;
+
+  private final Codec codec;
+
+  private final CompressionCodec compressor;
+
+  private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();
+
+  public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder 
cellBlockBuilder,
+      Codec codec, CompressionCodec compressor) {
+    this.conn = conn;
+    this.cellBlockBuilder = cellBlockBuilder;
+    this.codec = codec;
+    this.compressor = compressor;
+
+  }
+
+  private void writeRequest(ChannelHandlerContext ctx, Call call, 
ChannelPromise promise)
+      throws IOException {
+    id2Call.put(call.id, call);
+    ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, 
call.cells, ctx.alloc());
+    CellBlockMeta cellBlockMeta;
+    if (cellBlock != null) {
+      CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder();
+      cellBlockMetaBuilder.setLength(cellBlock.writerIndex());
+      cellBlockMeta = cellBlockMetaBuilder.build();
+    } else {
+      cellBlockMeta = null;
+    }
+    RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, 
cellBlockMeta);
+    int sizeWithoutCellBlock = 
IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param);
+    int totalSize = cellBlock != null ? sizeWithoutCellBlock + 
cellBlock.writerIndex()
+        : sizeWithoutCellBlock;
+    ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4);
+    buf.writeInt(totalSize);
+    ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+    requestHeader.writeDelimitedTo(bbos);
+    if (call.param != null) {
+      call.param.writeDelimitedTo(bbos);
+    }
+    if (cellBlock != null) {
+      ChannelPromise withoutCellBlockPromise = ctx.newPromise();
+      ctx.write(buf, withoutCellBlockPromise);
+      ChannelPromise cellBlockPromise = ctx.newPromise();
+      ctx.write(cellBlock, cellBlockPromise);
+      PromiseCombiner combiner = new PromiseCombiner();
+      combiner.addAll(withoutCellBlockPromise, cellBlockPromise);
+      combiner.finish(promise);
+    } else {
+      ctx.write(buf, promise);
+    }
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise)
+      throws Exception {
+    if (msg instanceof Call) {
+      writeRequest(ctx, (Call) msg, promise);
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws 
IOException {
+    int totalSize = buf.readInt();
+    ByteBufInputStream in = new ByteBufInputStream(buf);
+    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+    int id = responseHeader.getCallId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("got response header " + 
TextFormat.shortDebugString(responseHeader)
+          + ", totalSize: " + totalSize + " bytes");
+    }
+    RemoteException remoteExc;
+    if (responseHeader.hasException()) {
+      ExceptionResponse exceptionResponse = responseHeader.getException();
+      remoteExc = IPCUtil.createRemoteException(exceptionResponse);
+      if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
+        // Here we will cleanup all calls so do not need to fall back, just 
return.
+        exceptionCaught(ctx, remoteExc);
+        return;
+      }
+    } else {
+      remoteExc = null;
+    }
+    Call call = id2Call.remove(id);
+    if (call == null) {
+      // So we got a response for which we have no corresponding 'call' here 
on the client-side.
+      // We probably timed out waiting, cleaned up all references, and now the 
server decides
+      // to return a response. There is nothing we can do w/ the response at 
this stage. Clean
+      // out the wire of the response so its out of the way and we can get 
other responses on
+      // this connection.
+      int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
+      int whatIsLeftToRead = totalSize - readSoFar;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Unknown callId: " + id + ", skipping over this response of 
" + whatIsLeftToRead
+            + " bytes");
+      }
+      return;
+    }
+    if (remoteExc != null) {
+      call.setException(remoteExc);
+      return;
+    }
+    Message value;
+    if (call.responseDefaultType != null) {
+      Builder builder = call.responseDefaultType.newBuilderForType();
+      builder.mergeDelimitedFrom(in);
+      value = builder.build();
+    } else {
+      value = null;
+    }
+    CellScanner cellBlockScanner;
+    if (responseHeader.hasCellBlockMeta()) {
+      int size = responseHeader.getCellBlockMeta().getLength();
+      // Maybe we could read directly from the ByteBuf.
+      // The problem here is that we do not know when to release it.
+      byte[] cellBlock = new byte[size];
+      buf.readBytes(cellBlock);
+      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, 
this.compressor, cellBlock);
+    } else {
+      cellBlockScanner = null;
+    }
+    call.setResponse(value, cellBlockScanner);
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+    if (msg instanceof ByteBuf) {
+      ByteBuf buf = (ByteBuf) msg;
+      try {
+        readResponse(ctx, buf);
+      } finally {
+        buf.release();
+      }
+    } else {
+      super.channelRead(ctx, msg);
+    }
+  }
+
+  private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
+    for (Call call : id2Call.values()) {
+      call.setException(error);
+    }
+    id2Call.clear();
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    if (!id2Call.isEmpty()) {
+      cleanupCalls(ctx, new IOException("Connection closed"));
+    }
+    conn.shutdown();
+    ctx.fireChannelInactive();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    if (!id2Call.isEmpty()) {
+      cleanupCalls(ctx, IPCUtil.toIOE(cause));
+    }
+    conn.shutdown();
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws 
Exception {
+    if (evt instanceof IdleStateEvent) {
+      IdleStateEvent idleEvt = (IdleStateEvent) evt;
+      switch (idleEvt.state()) {
+        case WRITER_IDLE:
+          if (id2Call.isEmpty()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("shutdown connection to " + conn.remoteId().address
+                  + " because idle for a long time");
+            }
+            // It may happen that there are still some pending calls in the 
event loop queue and
+            // they will get a closed channel exception. But this is not a big 
deal as it rarely
+            // rarely happens and the upper layer could retry immediately.
+            conn.shutdown();
+          }
+          break;
+        default:
+          LOG.warn("Unrecognized idle state " + idleEvt.state());
+          break;
+      }
+    } else if (evt instanceof CallEvent) {
+      // just remove the call for now until we add other call event other than 
timeout and cancel.
+      id2Call.remove(((CallEvent) evt).call.id);
+    } else {
+      ctx.fireUserEventTriggered(evt);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
deleted file mode 100644
index 82634e5..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-
-/**
- * Optionally carries Cells across the proxy/service interface down into ipc. 
On its
- * way out it optionally carries a set of result Cell data.  We stick the 
Cells here when we want
- * to avoid having to protobuf them.  This class is used ferrying data across 
the proxy/protobuf
- * service chasm.  Used by client and server ipc'ing.
- */
[email protected]
-public class PayloadCarryingRpcController
-    extends TimeLimitedRpcController implements CellScannable {
-
-  public static final int PRIORITY_UNSET = -1;
-  /**
-   * Priority to set on this request.  Set it here in controller so available 
composing the
-   * request.  This is the ordained way of setting priorities going forward.  
We will be
-   * undoing the old annotation-based mechanism.
-   */
-  private int priority = PRIORITY_UNSET;
-
-  /**
-   * They are optionally set on construction, cleared after we make the call, 
and then optionally
-   * set on response with the result. We use this lowest common denominator 
access to Cells because
-   * sometimes the scanner is backed by a List of Cells and other times, it is 
backed by an
-   * encoded block that implements CellScanner.
-   */
-  private CellScanner cellScanner;
-
-  public PayloadCarryingRpcController() {
-    this((CellScanner)null);
-  }
-
-  public PayloadCarryingRpcController(final CellScanner cellScanner) {
-    this.cellScanner = cellScanner;
-  }
-
-  public PayloadCarryingRpcController(final List<CellScannable> cellIterables) 
{
-    this.cellScanner = cellIterables == null? null: 
CellUtil.createCellScanner(cellIterables);
-  }
-
-  /**
-   * @return One-shot cell scanner (you cannot back it up and restart)
-   */
-  @Override
-  public CellScanner cellScanner() {
-    return cellScanner;
-  }
-
-  public void setCellScanner(final CellScanner cellScanner) {
-    this.cellScanner = cellScanner;
-  }
-
-  /**
-   * @param priority Priority for this request; should fall roughly in the 
range
-   * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
-   */
-  public void setPriority(int priority) {
-    this.priority = priority;
-  }
-
-  /**
-   * @param tn Set priority based off the table we are going against.
-   */
-  public void setPriority(final TableName tn) {
-    this.priority =
-        (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: 
HConstants.NORMAL_QOS;
-  }
-
-  /**
-   * @return The priority of this request
-   */
-  public int getPriority() {
-    return priority;
-  }
-
-  @Override public void reset() {
-    super.reset();
-    priority = 0;
-    cellScanner = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 3ceecfc..0052423 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -18,15 +18,16 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
@@ -35,10 +36,6 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-
 /**
  * Provides clients with an RPC connection to call coprocessor endpoint {@link 
com.google.protobuf.Service}s
  * against a given table region.  An instance of this class may be obtained
@@ -90,11 +87,10 @@ public class RegionCoprocessorRpcChannel extends 
CoprocessorRpcChannel{
         new RegionServerCallable<CoprocessorServiceResponse>(connection, 
table, row) {
       @Override
       public CoprocessorServiceResponse call(int callTimeout) throws Exception 
{
-        if (rpcController instanceof PayloadCarryingRpcController) {
-          ((PayloadCarryingRpcController) 
rpcController).setPriority(tableName);
-        }
-        if (rpcController instanceof TimeLimitedRpcController) {
-          ((TimeLimitedRpcController) 
rpcController).setCallTimeout(callTimeout);
+        if (rpcController instanceof HBaseRpcController) {
+          HBaseRpcController hrc = (HBaseRpcController) rpcController;
+          hrc.setPriority(tableName);
+          hrc.setCallTimeout(callTimeout);
         }
         byte[] regionName = getLocation().getRegionInfo().getRegionName();
         return ProtobufUtil.execService(rpcController, getStub(), call, 
regionName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 540e224..26a5739 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -18,40 +18,43 @@
 package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.BlockingRpcChannel;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.security.User;
+import com.google.protobuf.RpcChannel;
 
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+
 /**
  * Interface for RpcClient implementations so ConnectionManager can handle it.
  */
[email protected] public interface RpcClient extends Closeable {
-  public final static String FAILED_SERVER_EXPIRY_KEY = 
"hbase.ipc.client.failed.servers.expiry";
-  public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
-  public final static String IDLE_TIME = 
"hbase.ipc.client.connection.minIdleTimeBeforeClose";
-  public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY =
[email protected]
+public interface RpcClient extends Closeable {
+  String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
+  int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
+  String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose";
+  String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY =
       "hbase.ipc.client.fallback-to-simple-auth-allowed";
-  public static final boolean 
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
-  public static final String SPECIFIC_WRITE_THREAD = 
"hbase.ipc.client.specificThreadForWriting";
-  public static final String DEFAULT_CODEC_CLASS = 
"hbase.client.default.rpc.codec";
+  boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
+  String SPECIFIC_WRITE_THREAD = "hbase.ipc.client.specificThreadForWriting";
+  String DEFAULT_CODEC_CLASS = "hbase.client.default.rpc.codec";
 
-  public final static String SOCKET_TIMEOUT_CONNECT = 
"hbase.ipc.client.socket.timeout.connect";
+  String SOCKET_TIMEOUT_CONNECT = "hbase.ipc.client.socket.timeout.connect";
   /**
    * How long we wait when we wait for an answer. It's not the operation time, 
it's the time
    * we wait when we start to receive an answer, when the remote write starts 
to send the data.
    */
-  public final static String SOCKET_TIMEOUT_READ = 
"hbase.ipc.client.socket.timeout.read";
-  public final static String SOCKET_TIMEOUT_WRITE = 
"hbase.ipc.client.socket.timeout.write";
-  public final static int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds
-  public final static int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds
-  public final static int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds
+  String SOCKET_TIMEOUT_READ = "hbase.ipc.client.socket.timeout.read";
+  String SOCKET_TIMEOUT_WRITE = "hbase.ipc.client.socket.timeout.write";
+  int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds
+  int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds
+  int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds
 
   // Used by the server, for compatibility with old clients.
   // The client in 0.99+ does not ping the server.
-  final static int PING_CALL_ID = -1;
+  int PING_CALL_ID = -1;
 
   /**
    * Creates a "channel" that can be used by a blocking protobuf service.  
Useful setting up
@@ -64,8 +67,21 @@ import java.io.IOException;
    * @return A blocking rpc channel that goes via this rpc client instance.
    * @throws IOException when channel could not be created
    */
-  public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user,
-      int rpcTimeout) throws IOException;
+  BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int 
rpcTimeout)
+      throws IOException;
+
+  /**
+   * Creates a "channel" that can be used by a protobuf service.  Useful 
setting up
+   * protobuf stubs.
+   *
+   * @param sn server name describing location of server
+   * @param user which is to use the connection
+   * @param rpcTimeout default rpc operation timeout
+   *
+   * @return A rpc channel that goes via this rpc client instance.
+   */
+  RpcChannel createRpcChannel(final ServerName sn, final User user, int 
rpcTimeout)
+      throws IOException;
 
   /**
    * Interrupt the connections to the given server. This should be called if 
the server
@@ -76,13 +92,14 @@ import java.io.IOException;
    * safe exception.
    * @param sn server location to cancel connections of
    */
-  public void cancelConnections(ServerName sn);
+  void cancelConnections(ServerName sn);
 
   /**
    * Stop all threads related to this client.  No further calls may be made
    * using this client.
    */
-  @Override public void close();
+  @Override
+  void close();
 
   /**
    * @return true when this client uses a {@link 
org.apache.hadoop.hbase.codec.Codec} and so

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
index 8f45eb7..1f7a5a2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
@@ -18,13 +18,15 @@
 package org.apache.hadoop.hbase.ipc;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import java.net.SocketAddress;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
-import java.net.SocketAddress;
-
 /**
  * Factory to create a {@link org.apache.hadoop.hbase.ipc.RpcClient}
  */
@@ -33,6 +35,10 @@ public final class RpcClientFactory {
 
   public static final String CUSTOM_RPC_CLIENT_IMPL_CONF_KEY = 
"hbase.rpc.client.impl";
 
+  private static final ImmutableMap<String, String> DEPRECATED_NAME_MAPPING = 
ImmutableMap.of(
+    "org.apache.hadoop.hbase.ipc.RpcClientImpl", 
BlockingRpcClient.class.getName(),
+    "org.apache.hadoop.hbase.ipc.AsyncRpcClient", 
NettyRpcClient.class.getName());
+
   /**
    * Private Constructor
    */
@@ -58,6 +64,15 @@ public final class RpcClientFactory {
     return createClient(conf, clusterId, null, metrics);
   }
 
+  private static String getRpcClientClass(Configuration conf) {
+    String rpcClientClass = conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY);
+    if (rpcClientClass == null) {
+      return BlockingRpcClient.class.getName();
+    }
+    String mappedName = DEPRECATED_NAME_MAPPING.get(rpcClientClass);
+    return mappedName == null ? rpcClientClass : mappedName;
+  }
+
   /**
    * Creates a new RpcClient by the class defined in the configuration or 
falls back to
    * RpcClientImpl
@@ -69,14 +84,9 @@ public final class RpcClientFactory {
    */
   public static RpcClient createClient(Configuration conf, String clusterId,
       SocketAddress localAddr, MetricsConnection metrics) {
-    String rpcClientClass =
-        conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
-          RpcClientImpl.class.getName());
-    return ReflectionUtils.instantiateWithCustomCtor(
-        rpcClientClass,
-        new Class[] { Configuration.class, String.class, SocketAddress.class,
-            MetricsConnection.class },
-        new Object[] { conf, clusterId, localAddr, metrics }
-    );
+    String rpcClientClass = getRpcClientClass(conf);
+    return ReflectionUtils.instantiateWithCustomCtor(rpcClientClass, new 
Class[] {
+        Configuration.class, String.class, SocketAddress.class, 
MetricsConnection.class },
+      new Object[] { conf, clusterId, localAddr, metrics });
   }
 }
\ No newline at end of file

Reply via email to