http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index c238adb..4fa58ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -17,300 +17,173 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.DataInput; +import com.google.common.base.Preconditions; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Message; + import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; -import java.nio.BufferOverflowException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.io.BoundedByteBufferPool; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; -import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; - -import com.google.common.base.Preconditions; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.Message; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; /** * Utility to help ipc'ing. */ @InterfaceAudience.Private -public class IPCUtil { - // LOG is being used in TestIPCUtil - public static final Log LOG = LogFactory.getLog(IPCUtil.class); - /** - * How much we think the decompressor will expand the original compressed content. - */ - private final int cellBlockDecompressionMultiplier; - private final int cellBlockBuildingInitialBufferSize; - private final Configuration conf; - - public IPCUtil(final Configuration conf) { - super(); - this.conf = conf; - this.cellBlockDecompressionMultiplier = - conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); - - // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in - // #buildCellBlock. - this.cellBlockBuildingInitialBufferSize = - ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); - } +class IPCUtil { /** - * Thrown if a cellscanner but no codec to encode it with. - */ - public static class CellScannerButNoCodecException extends HBaseIOException {}; - - /** - * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or - * <code>compressor</code>. - * @param codec - * @param compressor - * @param cellScanner - * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. - * @throws IOException + * Write out header, param, and cell block if there is one. + * @param dos Stream to write into + * @param header to write + * @param param to write + * @param cellBlock to write + * @return Total number of bytes written. + * @throws IOException if write action fails */ - @SuppressWarnings("resource") - public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cellScanner) - throws IOException { - return buildCellBlock(codec, compressor, cellScanner, null); + public static int write(final OutputStream dos, final Message header, final Message param, + final ByteBuffer cellBlock) throws IOException { + // Must calculate total size and write that first so other side can read it all in in one + // swoop. This is dictated by how the server is currently written. Server needs to change + // if we are to be able to write without the length prefixing. + int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); + if (cellBlock != null) { + totalSize += cellBlock.remaining(); + } + return write(dos, header, param, cellBlock, totalSize); } - /** - * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or - * <code>compressor</code>. - * @param codec - * @param compressor - * @param cellScanner - * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate - * our own ByteBuffer. - * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not - * null, then this returned ByteBuffer came from there and should be returned to the pool when - * done. - * @throws IOException - */ - @SuppressWarnings("resource") - public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cellScanner, final BoundedByteBufferPool pool) - throws IOException { - if (cellScanner == null) return null; - if (codec == null) throw new CellScannerButNoCodecException(); - int bufferSize = this.cellBlockBuildingInitialBufferSize; - ByteBufferOutputStream baos = null; - ByteBuffer bb = null; - if (pool != null) { - bb = pool.getBuffer(); - bufferSize = bb.capacity(); - baos = new ByteBufferOutputStream(bb); - } else { - // Then we need to make our own to return. - if (cellScanner instanceof HeapSize) { - long longSize = ((HeapSize)cellScanner).heapSize(); - // Just make sure we don't have a size bigger than an int. - if (longSize > Integer.MAX_VALUE) { - throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); - } - bufferSize = ClassSize.align((int)longSize); - } - baos = new ByteBufferOutputStream(bufferSize); - } - OutputStream os = baos; - Compressor poolCompressor = null; - try { - if (compressor != null) { - if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); - poolCompressor = CodecPool.getCompressor(compressor); - os = compressor.createOutputStream(os, poolCompressor); - } - Codec.Encoder encoder = codec.getEncoder(os); - int count = 0; - while (cellScanner.advance()) { - encoder.write(cellScanner.current()); - count++; - } - encoder.flush(); - // If no cells, don't mess around. Just return null (could be a bunch of existence checking - // gets or something -- stuff that does not return a cell). - if (count == 0) { - if (pool != null && bb != null) { - pool.putBuffer(bb); - } - return null; - } - } catch (BufferOverflowException e) { - throw new DoNotRetryIOException(e); - } finally { - os.close(); - if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); + private static int write(final OutputStream dos, final Message header, final Message param, + final ByteBuffer cellBlock, final int totalSize) throws IOException { + // I confirmed toBytes does same as DataOutputStream#writeInt. + dos.write(Bytes.toBytes(totalSize)); + // This allocates a buffer that is the size of the message internally. + header.writeDelimitedTo(dos); + if (param != null) { + param.writeDelimitedTo(dos); } - if (LOG.isTraceEnabled()) { - if (bufferSize < baos.size()) { - LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() + - "; up hbase.ipc.cellblock.building.initial.buffersize?"); - } + if (cellBlock != null) { + dos.write(cellBlock.array(), 0, cellBlock.remaining()); } - return baos.getByteBuffer(); + dos.flush(); + return totalSize; } /** - * @param codec - * @param cellBlock - * @return CellScanner to work against the content of <code>cellBlock</code> - * @throws IOException + * @return Size on the wire when the two messages are written with writeDelimitedTo */ - public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock) - throws IOException { - return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); + public static int getTotalSizeWhenWrittenDelimited(Message... messages) { + int totalSize = 0; + for (Message m : messages) { + if (m == null) { + continue; + } + totalSize += m.getSerializedSize(); + totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize()); + } + Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); + return totalSize; } - /** - * @param codec - * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be - * position()'ed at the start of the cell block and limit()'ed at the end. - * @return CellScanner to work against the content of <code>cellBlock</code> - * @throws IOException - */ - public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final ByteBuffer cellBlock) - throws IOException { - // If compressed, decompress it first before passing it on else we will leak compression - // resources if the stream is not closed properly after we let it out. - InputStream is = null; - if (compressor != null) { - // GZIPCodec fails w/ NPE if no configuration. - if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); - Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); - CompressionInputStream cis = - compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); - ByteBufferOutputStream bbos = null; - try { - // TODO: This is ugly. The buffer will be resized on us if we guess wrong. - // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream(cellBlock.remaining() * - this.cellBlockDecompressionMultiplier); - IOUtils.copy(cis, bbos); - bbos.close(); - ByteBuffer bb = bbos.getByteBuffer(); - is = new ByteBufferInputStream(bb); - } finally { - if (is != null) is.close(); - if (bbos != null) bbos.close(); - - CodecPool.returnDecompressor(poolDecompressor); - } - } else { - is = new ByteBufferInputStream(cellBlock); + static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { + RequestHeader.Builder builder = RequestHeader.newBuilder(); + builder.setCallId(call.id); + if (call.span != null) { + builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId()) + .setTraceId(call.span.getTraceId())); + } + builder.setMethodName(call.md.getName()); + builder.setRequestParam(call.param != null); + if (cellBlockMeta != null) { + builder.setCellBlockMeta(cellBlockMeta); + } + // Only pass priority if there is one set. + if (call.priority != HBaseRpcController.PRIORITY_UNSET) { + builder.setPriority(call.priority); } - return codec.getDecoder(is); + builder.setTimeout(call.timeout); + + return builder.build(); } /** - * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its - * serialization. - * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null - * @throws IOException + * @param e exception to be wrapped + * @return RemoteException made from passed <code>e</code> */ - public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { - if (m == null) return null; - int serializedSize = m.getSerializedSize(); - int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); - byte [] buffer = new byte[serializedSize + vintSize]; - // Passing in a byte array saves COS creating a buffer which it does when using streams. - CodedOutputStream cos = CodedOutputStream.newInstance(buffer); - // This will write out the vint preamble and the message serialized. - cos.writeMessageNoTag(m); - cos.flush(); - cos.checkNoSpaceLeft(); - return ByteBuffer.wrap(buffer); + static RemoteException createRemoteException(final ExceptionResponse e) { + String innerExceptionClassName = e.getExceptionClassName(); + boolean doNotRetry = e.getDoNotRetry(); + return e.hasHostname() ? + // If a hostname then add it to the RemoteWithExtrasException + new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), + e.getPort(), doNotRetry) + : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); } /** - * Write out header, param, and cell block if there is one. - * @param dos - * @param header - * @param param - * @param cellBlock - * @return Total number of bytes written. - * @throws IOException + * @return True if the exception is a fatal connection exception. */ - public static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock) - throws IOException { - // Must calculate total size and write that first so other side can read it all in in one - // swoop. This is dictated by how the server is currently written. Server needs to change - // if we are to be able to write without the length prefixing. - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); - if (cellBlock != null) totalSize += cellBlock.remaining(); - return write(dos, header, param, cellBlock, totalSize); + static boolean isFatalConnectionException(final ExceptionResponse e) { + return e.getExceptionClassName().equals(FatalConnectionException.class.getName()); } - private static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock, final int totalSize) - throws IOException { - // I confirmed toBytes does same as DataOutputStream#writeInt. - dos.write(Bytes.toBytes(totalSize)); - // This allocates a buffer that is the size of the message internally. - header.writeDelimitedTo(dos); - if (param != null) param.writeDelimitedTo(dos); - if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); - dos.flush(); - return totalSize; + static IOException toIOE(Throwable t) { + if (t instanceof IOException) { + return (IOException) t; + } else { + return new IOException(t); + } } /** - * Read in chunks of 8K (HBASE-7239) - * @param in - * @param dest - * @param offset - * @param len - * @throws IOException + * Takes an Exception and the address we were trying to connect to and return an IOException with + * the input exception as the cause. The new exception provides the stack trace of the place where + * the exception is thrown and some extra diagnostics information. If the exception is + * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return + * an IOException. + * @param addr target address + * @param exception the relevant exception + * @return an exception to throw */ - public static void readChunked(final DataInput in, byte[] dest, int offset, int len) - throws IOException { - int maxRead = 8192; - - for (; offset < len; offset += maxRead) { - in.readFully(dest, offset, Math.min(len - offset, maxRead)); + static IOException wrapException(InetSocketAddress addr, Exception exception) { + if (exception instanceof ConnectException) { + // connection refused; include the host:port in the error + return (ConnectException) new ConnectException( + "Call to " + addr + " failed on connection exception: " + exception).initCause(exception); + } else if (exception instanceof SocketTimeoutException) { + return (SocketTimeoutException) new SocketTimeoutException( + "Call to " + addr + " failed because " + exception).initCause(exception); + } else if (exception instanceof ConnectionClosingException) { + return (ConnectionClosingException) new ConnectionClosingException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); + } else if (exception instanceof ServerTooBusyException) { + // we already have address in the exception message + return (IOException) exception; + } else if (exception instanceof DoNotRetryIOException) { + return (IOException) new DoNotRetryIOException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); + } else { + return (IOException) new IOException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); } } - /** - * @return Size on the wire when the two messages are written with writeDelimitedTo - */ - public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { - int totalSize = 0; - for (Message m: messages) { - if (m == null) continue; - totalSize += m.getSerializedSize(); - totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize()); - } - Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); - return totalSize; + static void setCancelled(Call call) { + call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime=" + + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" + + call.timeout)); } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index 7bb27e9..de2c8de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -18,23 +18,21 @@ package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; + import java.io.IOException; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; - /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s * against the active master. An instance of this class may be obtained http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java new file mode 100644 index 0000000..cde453f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.io.IOException; +import java.net.SocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Netty client for the requests and responses. + */ [email protected](HBaseInterfaceAudience.CONFIG) +public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> { + + final EventLoopGroup group; + + final Class<? extends Channel> channelClass; + + private final boolean shutdownGroupWhenClose; + + public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics) { + super(configuration, clusterId, localAddress, metrics); + Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass = NettyRpcClientConfigHelper + .getEventLoopConfig(conf); + if (groupAndChannelClass == null) { + // Use our own EventLoopGroup. + this.group = new NioEventLoopGroup(0, + new DefaultThreadFactory("IPC-NioEventLoopGroup", true, Thread.MAX_PRIORITY)); + this.channelClass = NioSocketChannel.class; + this.shutdownGroupWhenClose = true; + } else { + this.group = groupAndChannelClass.getFirst(); + this.channelClass = groupAndChannelClass.getSecond(); + this.shutdownGroupWhenClose = false; + } + } + + /** Used in test only. */ + NettyRpcClient(Configuration configuration) { + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); + } + + @Override + protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException { + return new NettyRpcConnection(this, remoteId); + } + + @Override + protected void closeInternal() { + if (shutdownGroupWhenClose) { + group.shutdownGracefully(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java new file mode 100644 index 0000000..a8af69c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Helper class for passing config to {@link NettyRpcClient}. + * <p> + * As hadoop Configuration can not pass an Object directly, we need to find a way to pass the + * EventLoopGroup to {@code AsyncRpcClient} if we want to use a single {@code EventLoopGroup} for + * the whole process. + */ [email protected] [email protected] +public class NettyRpcClientConfigHelper { + + public static final String EVENT_LOOP_CONFIG = "hbase.rpc.client.event-loop.config"; + + private static final String CONFIG_NAME = "global-event-loop"; + + private static final Map<String, Pair<EventLoopGroup, Class<? extends Channel>>> + EVENT_LOOP_CONFIG_MAP = new HashMap<>(); + + /** + * Set the EventLoopGroup and channel class for {@code AsyncRpcClient}. + */ + public static void setEventLoopConfig(Configuration conf, EventLoopGroup group, + Class<? extends Channel> channelClass) { + Preconditions.checkNotNull(group, "group is null"); + Preconditions.checkNotNull(channelClass, "channel class is null"); + conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME); + EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME, + Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, channelClass)); + } + + /** + * The {@code AsyncRpcClient} will create its own {@code NioEventLoopGroup}. + */ + public static void createEventLoopPerClient(Configuration conf) { + conf.set(EVENT_LOOP_CONFIG, ""); + EVENT_LOOP_CONFIG_MAP.clear(); + } + + static Pair<EventLoopGroup, Class<? extends Channel>> getEventLoopConfig(Configuration conf) { + String name = conf.get(EVENT_LOOP_CONFIG); + if (name == null) { + return DefaultNettyEventLoopConfig.GROUP_AND_CHANNEL_CLASS; + } + if (StringUtils.isBlank(name)) { + return null; + } + return EVENT_LOOP_CONFIG_MAP.get(name); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java new file mode 100644 index 0000000..9a90b09 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED; +import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; +import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; +import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; + +import com.google.protobuf.RpcCallback; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; +import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; +import org.apache.hadoop.hbase.security.SaslChallengeDecoder; +import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * RPC connection implementation based on netty. + * <p> + * Most operations are executed in handlers. Netty handler is always executed in the same + * thread(EventLoop) so no lock is needed. + */ [email protected] +class NettyRpcConnection extends RpcConnection { + + private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class); + + private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors + .newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); + + private final NettyRpcClient rpcClient; + + private ByteBuf connectionHeaderPreamble; + + private ByteBuf connectionHeaderWithLength; + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "connect is also under lock as notifyOnCancel will call our action directly") + private Channel channel; + + NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { + super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + this.rpcClient = rpcClient; + byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); + this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length) + .writeBytes(connectionHeaderPreamble); + ConnectionHeader header = getConnectionHeader(); + this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); + this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); + header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); + } + + @Override + protected synchronized void callTimeout(Call call) { + if (channel != null) { + channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call)); + } + } + + @Override + public synchronized boolean isActive() { + return channel != null; + } + + private void shutdown0() { + if (channel != null) { + channel.close(); + channel = null; + } + } + + @Override + public synchronized void shutdown() { + shutdown0(); + } + + @Override + public synchronized void cleanupConnection() { + if (connectionHeaderPreamble != null) { + ReferenceCountUtil.safeRelease(connectionHeaderPreamble); + } + if (connectionHeaderWithLength != null) { + ReferenceCountUtil.safeRelease(connectionHeaderWithLength); + } + } + + private void established(Channel ch) { + ch.write(connectionHeaderWithLength.retainedDuplicate()); + ChannelPipeline p = ch.pipeline(); + String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); + p.addBefore(addBeforeHandler, null, + new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); + p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); + p.addBefore(addBeforeHandler, null, + new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); + p.fireUserEventTriggered(BufferCallEvent.success()); + } + + private boolean reloginInProgress; + + private void scheduleRelogin(Throwable error) { + if (error instanceof FallbackDisallowedException) { + return; + } + synchronized (this) { + if (reloginInProgress) { + return; + } + reloginInProgress = true; + RELOGIN_EXECUTOR.schedule(new Runnable() { + + @Override + public void run() { + try { + if (shouldAuthenticateOverKrb()) { + relogin(); + } + } catch (IOException e) { + LOG.warn("relogin failed", e); + } + synchronized (this) { + reloginInProgress = false; + } + } + }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS); + } + } + + private void failInit(Channel ch, IOException e) { + synchronized (this) { + // fail all pending calls + ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); + shutdown0(); + return; + } + } + + private void saslNegotiate(final Channel ch) { + UserGroupInformation ticket = getUGI(); + if (ticket == null) { + failInit(ch, new FatalConnectionException("ticket/user is null")); + return; + } + Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); + ChannelHandler saslHandler; + try { + saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token, + serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get( + "hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase())); + } catch (IOException e) { + failInit(ch, e); + return; + } + ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler); + saslPromise.addListener(new FutureListener<Boolean>() { + + @Override + public void operationComplete(Future<Boolean> future) throws Exception { + if (future.isSuccess()) { + ChannelPipeline p = ch.pipeline(); + p.remove(SaslChallengeDecoder.class); + p.remove(NettyHBaseSaslRpcClientHandler.class); + established(ch); + } else { + final Throwable error = future.cause(); + scheduleRelogin(error); + failInit(ch, toIOE(error)); + } + } + }); + } + + private void connect() { + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to " + remoteId.address); + } + + this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass) + .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) + .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) + .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) + .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + Channel ch = future.channel(); + if (!future.isSuccess()) { + failInit(ch, toIOE(future.cause())); + rpcClient.failedServers.addToFailedServers(remoteId.address); + return; + } + ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); + if (useSasl) { + saslNegotiate(ch); + } else { + established(ch); + } + } + }).channel(); + } + + @Override + public synchronized void sendRequest(final Call call, HBaseRpcController hrc) + throws IOException { + if (reloginInProgress) { + throw new IOException("Can not send request because relogin is in progress."); + } + hrc.notifyOnCancel(new RpcCallback<Object>() { + + @Override + public void run(Object parameter) { + setCancelled(call); + synchronized (this) { + if (channel != null) { + channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call)); + } + } + } + }, new CancellationCallback() { + + @Override + public void run(boolean cancelled) throws IOException { + if (cancelled) { + setCancelled(call); + } else { + if (channel == null) { + connect(); + } + scheduleTimeoutTask(call); + channel.writeAndFlush(call).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Fail the call if we failed to write it out. This usually because the channel is + // closed. This is needed because we may shutdown the channel inside event loop and + // there may still be some pending calls in the event loop queue after us. + if (!future.isSuccess()) { + call.setException(toIOE(future.cause())); + } + } + }); + } + } + }); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java new file mode 100644 index 0000000..5faaede --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import com.google.protobuf.TextFormat; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.PromiseCombiner; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.ipc.RemoteException; + +/** + * The netty rpc handler. + */ [email protected] +class NettyRpcDuplexHandler extends ChannelDuplexHandler { + + private static final Log LOG = LogFactory.getLog(NettyRpcDuplexHandler.class); + + private final NettyRpcConnection conn; + + private final CellBlockBuilder cellBlockBuilder; + + private final Codec codec; + + private final CompressionCodec compressor; + + private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>(); + + public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder, + Codec codec, CompressionCodec compressor) { + this.conn = conn; + this.cellBlockBuilder = cellBlockBuilder; + this.codec = codec; + this.compressor = compressor; + + } + + private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise) + throws IOException { + id2Call.put(call.id, call); + ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc()); + CellBlockMeta cellBlockMeta; + if (cellBlock != null) { + CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder(); + cellBlockMetaBuilder.setLength(cellBlock.writerIndex()); + cellBlockMeta = cellBlockMetaBuilder.build(); + } else { + cellBlockMeta = null; + } + RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta); + int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param); + int totalSize = cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() + : sizeWithoutCellBlock; + ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4); + buf.writeInt(totalSize); + ByteBufOutputStream bbos = new ByteBufOutputStream(buf); + requestHeader.writeDelimitedTo(bbos); + if (call.param != null) { + call.param.writeDelimitedTo(bbos); + } + if (cellBlock != null) { + ChannelPromise withoutCellBlockPromise = ctx.newPromise(); + ctx.write(buf, withoutCellBlockPromise); + ChannelPromise cellBlockPromise = ctx.newPromise(); + ctx.write(cellBlock, cellBlockPromise); + PromiseCombiner combiner = new PromiseCombiner(); + combiner.addAll(withoutCellBlockPromise, cellBlockPromise); + combiner.finish(promise); + } else { + ctx.write(buf, promise); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof Call) { + writeRequest(ctx, (Call) msg, promise); + } else { + ctx.write(msg, promise); + } + } + + private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { + int totalSize = buf.readInt(); + ByteBufInputStream in = new ByteBufInputStream(buf); + ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); + int id = responseHeader.getCallId(); + if (LOG.isTraceEnabled()) { + LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) + + ", totalSize: " + totalSize + " bytes"); + } + RemoteException remoteExc; + if (responseHeader.hasException()) { + ExceptionResponse exceptionResponse = responseHeader.getException(); + remoteExc = IPCUtil.createRemoteException(exceptionResponse); + if (IPCUtil.isFatalConnectionException(exceptionResponse)) { + // Here we will cleanup all calls so do not need to fall back, just return. + exceptionCaught(ctx, remoteExc); + return; + } + } else { + remoteExc = null; + } + Call call = id2Call.remove(id); + if (call == null) { + // So we got a response for which we have no corresponding 'call' here on the client-side. + // We probably timed out waiting, cleaned up all references, and now the server decides + // to return a response. There is nothing we can do w/ the response at this stage. Clean + // out the wire of the response so its out of the way and we can get other responses on + // this connection. + int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); + int whatIsLeftToRead = totalSize - readSoFar; + if (LOG.isDebugEnabled()) { + LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead + + " bytes"); + } + return; + } + if (remoteExc != null) { + call.setException(remoteExc); + return; + } + Message value; + if (call.responseDefaultType != null) { + Builder builder = call.responseDefaultType.newBuilderForType(); + builder.mergeDelimitedFrom(in); + value = builder.build(); + } else { + value = null; + } + CellScanner cellBlockScanner; + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + // Maybe we could read directly from the ByteBuf. + // The problem here is that we do not know when to release it. + byte[] cellBlock = new byte[size]; + buf.readBytes(cellBlock); + cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); + } else { + cellBlockScanner = null; + } + call.setResponse(value, cellBlockScanner); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + try { + readResponse(ctx, buf); + } finally { + buf.release(); + } + } else { + super.channelRead(ctx, msg); + } + } + + private void cleanupCalls(ChannelHandlerContext ctx, IOException error) { + for (Call call : id2Call.values()) { + call.setException(error); + } + id2Call.clear(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (!id2Call.isEmpty()) { + cleanupCalls(ctx, new IOException("Connection closed")); + } + conn.shutdown(); + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!id2Call.isEmpty()) { + cleanupCalls(ctx, IPCUtil.toIOE(cause)); + } + conn.shutdown(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent idleEvt = (IdleStateEvent) evt; + switch (idleEvt.state()) { + case WRITER_IDLE: + if (id2Call.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("shutdown connection to " + conn.remoteId().address + + " because idle for a long time"); + } + // It may happen that there are still some pending calls in the event loop queue and + // they will get a closed channel exception. But this is not a big deal as it rarely + // rarely happens and the upper layer could retry immediately. + conn.shutdown(); + } + break; + default: + LOG.warn("Unrecognized idle state " + idleEvt.state()); + break; + } + } else if (evt instanceof CallEvent) { + // just remove the call for now until we add other call event other than timeout and cancel. + id2Call.remove(((CallEvent) evt).call.id); + } else { + ctx.fireUserEventTriggered(evt); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java deleted file mode 100644 index 82634e5..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.util.List; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; - -/** - * Optionally carries Cells across the proxy/service interface down into ipc. On its - * way out it optionally carries a set of result Cell data. We stick the Cells here when we want - * to avoid having to protobuf them. This class is used ferrying data across the proxy/protobuf - * service chasm. Used by client and server ipc'ing. - */ [email protected] -public class PayloadCarryingRpcController - extends TimeLimitedRpcController implements CellScannable { - - public static final int PRIORITY_UNSET = -1; - /** - * Priority to set on this request. Set it here in controller so available composing the - * request. This is the ordained way of setting priorities going forward. We will be - * undoing the old annotation-based mechanism. - */ - private int priority = PRIORITY_UNSET; - - /** - * They are optionally set on construction, cleared after we make the call, and then optionally - * set on response with the result. We use this lowest common denominator access to Cells because - * sometimes the scanner is backed by a List of Cells and other times, it is backed by an - * encoded block that implements CellScanner. - */ - private CellScanner cellScanner; - - public PayloadCarryingRpcController() { - this((CellScanner)null); - } - - public PayloadCarryingRpcController(final CellScanner cellScanner) { - this.cellScanner = cellScanner; - } - - public PayloadCarryingRpcController(final List<CellScannable> cellIterables) { - this.cellScanner = cellIterables == null? null: CellUtil.createCellScanner(cellIterables); - } - - /** - * @return One-shot cell scanner (you cannot back it up and restart) - */ - @Override - public CellScanner cellScanner() { - return cellScanner; - } - - public void setCellScanner(final CellScanner cellScanner) { - this.cellScanner = cellScanner; - } - - /** - * @param priority Priority for this request; should fall roughly in the range - * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} - */ - public void setPriority(int priority) { - this.priority = priority; - } - - /** - * @param tn Set priority based off the table we are going against. - */ - public void setPriority(final TableName tn) { - this.priority = - (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS; - } - - /** - * @return The priority of this request - */ - public int getPriority() { - return priority; - } - - @Override public void reset() { - super.reset(); - priority = 0; - cellScanner = null; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 3ceecfc..0052423 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -18,15 +18,16 @@ package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; + import java.io.IOException; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; @@ -35,10 +36,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.util.Bytes; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; - /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s * against a given table region. An instance of this class may be obtained @@ -90,11 +87,10 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{ new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { @Override public CoprocessorServiceResponse call(int callTimeout) throws Exception { - if (rpcController instanceof PayloadCarryingRpcController) { - ((PayloadCarryingRpcController) rpcController).setPriority(tableName); - } - if (rpcController instanceof TimeLimitedRpcController) { - ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout); + if (rpcController instanceof HBaseRpcController) { + HBaseRpcController hrc = (HBaseRpcController) rpcController; + hrc.setPriority(tableName); + hrc.setCallTimeout(callTimeout); } byte[] regionName = getLocation().getRegionInfo().getRegionName(); return ProtobufUtil.execService(rpcController, getStub(), call, regionName); http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 540e224..26a5739 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -18,40 +18,43 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.BlockingRpcChannel; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.security.User; +import com.google.protobuf.RpcChannel; import java.io.Closeable; import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.security.User; + /** * Interface for RpcClient implementations so ConnectionManager can handle it. */ [email protected] public interface RpcClient extends Closeable { - public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; - public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; - public final static String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose"; - public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = [email protected] +public interface RpcClient extends Closeable { + String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; + int FAILED_SERVER_EXPIRY_DEFAULT = 2000; + String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose"; + String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "hbase.ipc.client.fallback-to-simple-auth-allowed"; - public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; - public static final String SPECIFIC_WRITE_THREAD = "hbase.ipc.client.specificThreadForWriting"; - public static final String DEFAULT_CODEC_CLASS = "hbase.client.default.rpc.codec"; + boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; + String SPECIFIC_WRITE_THREAD = "hbase.ipc.client.specificThreadForWriting"; + String DEFAULT_CODEC_CLASS = "hbase.client.default.rpc.codec"; - public final static String SOCKET_TIMEOUT_CONNECT = "hbase.ipc.client.socket.timeout.connect"; + String SOCKET_TIMEOUT_CONNECT = "hbase.ipc.client.socket.timeout.connect"; /** * How long we wait when we wait for an answer. It's not the operation time, it's the time * we wait when we start to receive an answer, when the remote write starts to send the data. */ - public final static String SOCKET_TIMEOUT_READ = "hbase.ipc.client.socket.timeout.read"; - public final static String SOCKET_TIMEOUT_WRITE = "hbase.ipc.client.socket.timeout.write"; - public final static int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds - public final static int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds - public final static int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds + String SOCKET_TIMEOUT_READ = "hbase.ipc.client.socket.timeout.read"; + String SOCKET_TIMEOUT_WRITE = "hbase.ipc.client.socket.timeout.write"; + int DEFAULT_SOCKET_TIMEOUT_CONNECT = 10000; // 10 seconds + int DEFAULT_SOCKET_TIMEOUT_READ = 20000; // 20 seconds + int DEFAULT_SOCKET_TIMEOUT_WRITE = 60000; // 60 seconds // Used by the server, for compatibility with old clients. // The client in 0.99+ does not ping the server. - final static int PING_CALL_ID = -1; + int PING_CALL_ID = -1; /** * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up @@ -64,8 +67,21 @@ import java.io.IOException; * @return A blocking rpc channel that goes via this rpc client instance. * @throws IOException when channel could not be created */ - public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, - int rpcTimeout) throws IOException; + BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) + throws IOException; + + /** + * Creates a "channel" that can be used by a protobuf service. Useful setting up + * protobuf stubs. + * + * @param sn server name describing location of server + * @param user which is to use the connection + * @param rpcTimeout default rpc operation timeout + * + * @return A rpc channel that goes via this rpc client instance. + */ + RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) + throws IOException; /** * Interrupt the connections to the given server. This should be called if the server @@ -76,13 +92,14 @@ import java.io.IOException; * safe exception. * @param sn server location to cancel connections of */ - public void cancelConnections(ServerName sn); + void cancelConnections(ServerName sn); /** * Stop all threads related to this client. No further calls may be made * using this client. */ - @Override public void close(); + @Override + void close(); /** * @return true when this client uses a {@link org.apache.hadoop.hbase.codec.Codec} and so http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index 8f45eb7..1f7a5a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -18,13 +18,15 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +import java.net.SocketAddress; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.ReflectionUtils; -import java.net.SocketAddress; - /** * Factory to create a {@link org.apache.hadoop.hbase.ipc.RpcClient} */ @@ -33,6 +35,10 @@ public final class RpcClientFactory { public static final String CUSTOM_RPC_CLIENT_IMPL_CONF_KEY = "hbase.rpc.client.impl"; + private static final ImmutableMap<String, String> DEPRECATED_NAME_MAPPING = ImmutableMap.of( + "org.apache.hadoop.hbase.ipc.RpcClientImpl", BlockingRpcClient.class.getName(), + "org.apache.hadoop.hbase.ipc.AsyncRpcClient", NettyRpcClient.class.getName()); + /** * Private Constructor */ @@ -58,6 +64,15 @@ public final class RpcClientFactory { return createClient(conf, clusterId, null, metrics); } + private static String getRpcClientClass(Configuration conf) { + String rpcClientClass = conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY); + if (rpcClientClass == null) { + return BlockingRpcClient.class.getName(); + } + String mappedName = DEPRECATED_NAME_MAPPING.get(rpcClientClass); + return mappedName == null ? rpcClientClass : mappedName; + } + /** * Creates a new RpcClient by the class defined in the configuration or falls back to * RpcClientImpl @@ -69,14 +84,9 @@ public final class RpcClientFactory { */ public static RpcClient createClient(Configuration conf, String clusterId, SocketAddress localAddr, MetricsConnection metrics) { - String rpcClientClass = - conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, - RpcClientImpl.class.getName()); - return ReflectionUtils.instantiateWithCustomCtor( - rpcClientClass, - new Class[] { Configuration.class, String.class, SocketAddress.class, - MetricsConnection.class }, - new Object[] { conf, clusterId, localAddr, metrics } - ); + String rpcClientClass = getRpcClientClass(conf); + return ReflectionUtils.instantiateWithCustomCtor(rpcClientClass, new Class[] { + Configuration.class, String.class, SocketAddress.class, MetricsConnection.class }, + new Object[] { conf, clusterId, localAddr, metrics }); } } \ No newline at end of file
