Repository: hbase Updated Branches: refs/heads/master ea1552270 -> 647a65ce0
HBASE-16531 Move cell block related code out of IPCUtil Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/647a65ce Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/647a65ce Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/647a65ce Branch: refs/heads/master Commit: 647a65ce01d695bd181c8c5029c4ddd63cab9692 Parents: ea15522 Author: zhangduo <[email protected]> Authored: Tue Aug 30 18:26:42 2016 +0800 Committer: zhangduo <[email protected]> Committed: Wed Aug 31 13:09:22 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/ipc/AbstractRpcClient.java | 34 +-- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 8 +- .../hadoop/hbase/ipc/CellBlockBuilder.java | 229 ++++++++++++++++ .../ipc/CellScannerButNoCodecException.java | 31 +++ .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 272 +++++-------------- .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 33 +-- .../hadoop/hbase/ipc/TestCellBlockBuilder.java | 196 +++++++++++++ .../apache/hadoop/hbase/ipc/TestIPCUtil.java | 185 ++----------- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 10 +- .../hadoop/hbase/ipc/AbstractTestIPC.java | 18 -- 10 files changed, 549 insertions(+), 467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 3d3339a..4d0d16b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -26,10 +26,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import java.io.IOException; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.SocketTimeoutException; import java.net.UnknownHostException; import org.apache.commons.logging.Log; @@ -42,7 +40,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -64,7 +61,7 @@ public abstract class AbstractRpcClient implements RpcClient { protected final MetricsConnection metrics; protected UserProvider userProvider; - protected final IPCUtil ipcUtil; + protected final CellBlockBuilder cellBlockBuilder; protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this // time (in ms), it will be closed at any moment. @@ -98,7 +95,7 @@ public abstract class AbstractRpcClient implements RpcClient { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); - this.ipcUtil = new IPCUtil(conf); + this.cellBlockBuilder = new CellBlockBuilder(conf); this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes this.conf = conf; @@ -293,33 +290,6 @@ public abstract class AbstractRpcClient implements RpcClient { } /** - * Takes an Exception and the address we were trying to connect to and return an IOException with - * the input exception as the cause. The new exception provides the stack trace of the place where - * the exception is thrown and some extra diagnostics information. If the exception is - * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return - * an IOException. - * @param addr target address - * @param exception the relevant exception - * @return an exception to throw - */ - protected IOException wrapException(InetSocketAddress addr, Exception exception) { - if (exception instanceof ConnectException) { - // connection refused; include the host:port in the error - return (ConnectException) new ConnectException("Call to " + addr - + " failed on connection exception: " + exception).initCause(exception); - } else if (exception instanceof SocketTimeoutException) { - return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr - + " failed because " + exception).initCause(exception); - } else if (exception instanceof ConnectionClosingException) { - return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr - + " failed on local exception: " + exception).initCause(exception); - } else { - return (IOException) new IOException("Call to " + addr + " failed on local exception: " - + exception).initCause(exception); - } - } - - /** * Blocking rpc channel that goes via hbase rpc. */ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 3d343b4..e368c43 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -260,11 +260,11 @@ public class AsyncRpcClient extends AbstractRpcClient { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { - throw wrapException(addr, (Exception) e.getCause()); + throw IPCUtil.wrapException(addr, (Exception) e.getCause()); } } catch (TimeoutException e) { CallTimeoutException cte = new CallTimeoutException(promise.toString()); - throw wrapException(addr, cte); + throw IPCUtil.wrapException(addr, cte); } } @@ -359,7 +359,7 @@ public class AsyncRpcClient extends AbstractRpcClient { * @throws java.io.IOException on error on creation cell scanner */ public CellScanner createCellScanner(byte[] cellBlock) throws IOException { - return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); + return cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); } /** @@ -370,7 +370,7 @@ public class AsyncRpcClient extends AbstractRpcClient { * @throws java.io.IOException if block creation fails */ public ByteBuffer buildCellBlock(CellScanner cells) throws IOException { - return ipcUtil.buildCellBlock(this.codec, this.compressor, cells); + return cellBlockBuilder.buildCellBlock(this.codec, this.compressor, cells); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java new file mode 100644 index 0000000..072a490 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +/** + * Helper class for building cell block. + */ [email protected] +public class CellBlockBuilder { + + // LOG is being used in TestCellBlockBuilder + static final Log LOG = LogFactory.getLog(CellBlockBuilder.class); + + private final Configuration conf; + /** + * How much we think the decompressor will expand the original compressed content. + */ + private final int cellBlockDecompressionMultiplier; + + private final int cellBlockBuildingInitialBufferSize; + + public CellBlockBuilder(final Configuration conf) { + this.conf = conf; + this.cellBlockDecompressionMultiplier = conf + .getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); + + // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in + // #buildCellBlock. + this.cellBlockBuildingInitialBufferSize = ClassSize + .align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); + } + + /** + * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or + * <code>compressor</code>. + * @param codec to use for encoding + * @param compressor to use for encoding + * @param cellScanner to encode + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has + * been flipped and is ready for reading. Use limit to find total size. + * @throws IOException if encoding the cells fail + */ + public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, + final CellScanner cellScanner) throws IOException { + if (cellScanner == null) { + return null; + } + if (codec == null) { + throw new CellScannerButNoCodecException(); + } + int bufferSize = this.cellBlockBuildingInitialBufferSize; + ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); + encodeCellsTo(baos, cellScanner, codec, compressor); + if (LOG.isTraceEnabled()) { + if (bufferSize < baos.size()) { + LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() + + "; up hbase.ipc.cellblock.building.initial.buffersize?"); + } + } + ByteBuffer bb = baos.getByteBuffer(); + // If no cells, don't mess around. Just return null (could be a bunch of existence checking + // gets or something -- stuff that does not return a cell). + if (!bb.hasRemaining()) return null; + return bb; + } + + private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec, + CompressionCodec compressor) throws IOException { + OutputStream os = bbos; + Compressor poolCompressor = null; + try { + if (compressor != null) { + if (compressor instanceof Configurable) { + ((Configurable) compressor).setConf(this.conf); + } + poolCompressor = CodecPool.getCompressor(compressor); + os = compressor.createOutputStream(os, poolCompressor); + } + Codec.Encoder encoder = codec.getEncoder(os); + while (cellScanner.advance()) { + encoder.write(cellScanner.current()); + } + encoder.flush(); + } catch (BufferOverflowException e) { + throw new DoNotRetryIOException(e); + } finally { + os.close(); + if (poolCompressor != null) { + CodecPool.returnCompressor(poolCompressor); + } + } + } + + /** + * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or + * <code>compressor</code>. + * @param codec to use for encoding + * @param compressor to use for encoding + * @param cellScanner to encode + * @param pool Pool of ByteBuffers to make use of. + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has + * been flipped and is ready for reading. Use limit to find total size. If + * <code>pool</code> was not null, then this returned ByteBuffer came from there and + * should be returned to the pool when done. + * @throws IOException if encoding the cells fail + */ + public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, + CellScanner cellScanner, ByteBufferPool pool) throws IOException { + if (cellScanner == null) { + return null; + } + if (codec == null) { + throw new CellScannerButNoCodecException(); + } + assert pool != null; + ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); + encodeCellsTo(bbos, cellScanner, codec, compressor); + if (bbos.size() == 0) { + bbos.releaseResources(); + return null; + } + return bbos; + } + + /** + * @param codec to use for cellblock + * @param cellBlock to encode + * @return CellScanner to work against the content of <code>cellBlock</code> + * @throws IOException if encoding fails + */ + public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, + final byte[] cellBlock) throws IOException { + // Use this method from Client side to create the CellScanner + ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock); + if (compressor != null) { + cellBlockBuf = decompress(compressor, cellBlockBuf); + } + // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will + // make Cells directly over the passed BB. This method is called at client side and we don't + // want the Cells to share the same byte[] where the RPC response is being read. Caching of any + // of the Cells at user's app level will make it not possible to GC the response byte[] + return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); + } + + /** + * @param codec to use for cellblock + * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be + * position()'ed at the start of the cell block and limit()'ed at the end. + * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created + * out of the CellScanner will share the same ByteBuffer being passed. + * @throws IOException if cell encoding fails + */ + public CellScanner createCellScannerReusingBuffers(final Codec codec, + final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException { + // Use this method from HRS to create the CellScanner + // If compressed, decompress it first before passing it on else we will leak compression + // resources if the stream is not closed properly after we let it out. + if (compressor != null) { + cellBlock = decompress(compressor, cellBlock); + } + return codec.getDecoder(cellBlock); + } + + private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) + throws IOException { + // GZIPCodec fails w/ NPE if no configuration. + if (compressor instanceof Configurable) { + ((Configurable) compressor).setConf(this.conf); + } + Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); + CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock), + poolDecompressor); + ByteBufferOutputStream bbos; + try { + // TODO: This is ugly. The buffer will be resized on us if we guess wrong. + // TODO: Reuse buffers. + bbos = new ByteBufferOutputStream( + cellBlock.remaining() * this.cellBlockDecompressionMultiplier); + IOUtils.copy(cis, bbos); + bbos.close(); + cellBlock = bbos.getByteBuffer(); + } finally { + CodecPool.returnDecompressor(poolDecompressor); + } + return cellBlock; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java new file mode 100644 index 0000000..ffd27b3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown if a cellscanner but no codec to encode it with. + */ +@SuppressWarnings("serial") [email protected] [email protected] +public class CellScannerButNoCodecException extends HBaseIOException { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 74f934c..c18bd7e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -20,221 +20,25 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.base.Preconditions; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; + import java.io.IOException; import java.io.OutputStream; -import java.nio.BufferOverflowException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; -import org.apache.commons.io.IOUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; + import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; -import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.ByteBufferPool; -import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionInputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.ipc.RemoteException; /** * Utility to help ipc'ing. */ @InterfaceAudience.Private public class IPCUtil { - // LOG is being used in TestIPCUtil - public static final Log LOG = LogFactory.getLog(IPCUtil.class); - /** - * How much we think the decompressor will expand the original compressed content. - */ - private final int cellBlockDecompressionMultiplier; - private final int cellBlockBuildingInitialBufferSize; - private final Configuration conf; - - public IPCUtil(final Configuration conf) { - super(); - this.conf = conf; - this.cellBlockDecompressionMultiplier = - conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); - - // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in - // #buildCellBlock. - this.cellBlockBuildingInitialBufferSize = - ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); - } - - /** - * Thrown if a cellscanner but no codec to encode it with. - */ - public static class CellScannerButNoCodecException extends HBaseIOException {}; - - /** - * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or - * <code>compressor</code>. - * @param codec to use for encoding - * @param compressor to use for encoding - * @param cellScanner to encode - * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. - * @throws IOException if encoding the cells fail - */ - @SuppressWarnings("resource") - public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cellScanner) throws IOException { - if (cellScanner == null) { - return null; - } - if (codec == null) { - throw new CellScannerButNoCodecException(); - } - int bufferSize = this.cellBlockBuildingInitialBufferSize; - ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); - encodeCellsTo(baos, cellScanner, codec, compressor); - if (LOG.isTraceEnabled()) { - if (bufferSize < baos.size()) { - LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() - + "; up hbase.ipc.cellblock.building.initial.buffersize?"); - } - } - ByteBuffer bb = baos.getByteBuffer(); - // If no cells, don't mess around. Just return null (could be a bunch of existence checking - // gets or something -- stuff that does not return a cell). - if (!bb.hasRemaining()) return null; - return bb; - } - - private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec, - CompressionCodec compressor) throws IOException { - OutputStream os = bbos; - Compressor poolCompressor = null; - try { - if (compressor != null) { - if (compressor instanceof Configurable) { - ((Configurable) compressor).setConf(this.conf); - } - poolCompressor = CodecPool.getCompressor(compressor); - os = compressor.createOutputStream(os, poolCompressor); - } - Codec.Encoder encoder = codec.getEncoder(os); - while (cellScanner.advance()) { - encoder.write(cellScanner.current()); - } - encoder.flush(); - } catch (BufferOverflowException e) { - throw new DoNotRetryIOException(e); - } finally { - os.close(); - if (poolCompressor != null) { - CodecPool.returnCompressor(poolCompressor); - } - } - } - - /** - * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or - * <code>compressor</code>. - * @param codec to use for encoding - * @param compressor to use for encoding - * @param cellScanner to encode - * @param pool Pool of ByteBuffers to make use of. - * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not - * null, then this returned ByteBuffer came from there and should be returned to the pool when - * done. - * @throws IOException if encoding the cells fail - */ - @SuppressWarnings("resource") - public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, - CellScanner cellScanner, ByteBufferPool pool) throws IOException { - if (cellScanner == null) { - return null; - } - if (codec == null) { - throw new CellScannerButNoCodecException(); - } - assert pool != null; - ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); - encodeCellsTo(bbos, cellScanner, codec, compressor); - if (bbos.size() == 0) { - bbos.releaseResources(); - return null; - } - return bbos; - } - - /** - * @param codec to use for cellblock - * @param cellBlock to encode - * @return CellScanner to work against the content of <code>cellBlock</code> - * @throws IOException if encoding fails - */ - public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte[] cellBlock) throws IOException { - // Use this method from Client side to create the CellScanner - ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock); - if (compressor != null) { - cellBlockBuf = decompress(compressor, cellBlockBuf); - } - // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will - // make Cells directly over the passed BB. This method is called at client side and we don't - // want the Cells to share the same byte[] where the RPC response is being read. Caching of any - // of the Cells at user's app level will make it not possible to GC the response byte[] - return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); - } - - /** - * @param codec to use for cellblock - * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be - * position()'ed at the start of the cell block and limit()'ed at the end. - * @return CellScanner to work against the content of <code>cellBlock</code>. - * All cells created out of the CellScanner will share the same ByteBuffer being passed. - * @throws IOException if cell encoding fails - */ - public CellScanner createCellScannerReusingBuffers(final Codec codec, - final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException { - // Use this method from HRS to create the CellScanner - // If compressed, decompress it first before passing it on else we will leak compression - // resources if the stream is not closed properly after we let it out. - if (compressor != null) { - cellBlock = decompress(compressor, cellBlock); - } - return codec.getDecoder(cellBlock); - } - - private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) - throws IOException { - // GZIPCodec fails w/ NPE if no configuration. - if (compressor instanceof Configurable) { - ((Configurable) compressor).setConf(this.conf); - } - Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); - CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock), - poolDecompressor); - ByteBufferOutputStream bbos; - try { - // TODO: This is ugly. The buffer will be resized on us if we guess wrong. - // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream( - cellBlock.remaining() * this.cellBlockDecompressionMultiplier); - IOUtils.copy(cis, bbos); - bbos.close(); - cellBlock = bbos.getByteBuffer(); - } finally { - CodecPool.returnDecompressor(poolDecompressor); - } - return cellBlock; - } /** * Write out header, param, and cell block if there is one. @@ -246,10 +50,9 @@ public class IPCUtil { * @throws IOException if write action fails */ public static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock) - throws IOException { + final ByteBuffer cellBlock) throws IOException { // Must calculate total size and write that first so other side can read it all in in one - // swoop. This is dictated by how the server is currently written. Server needs to change + // swoop. This is dictated by how the server is currently written. Server needs to change // if we are to be able to write without the length prefixing. int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); if (cellBlock != null) { @@ -259,8 +62,7 @@ public class IPCUtil { } private static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock, final int totalSize) - throws IOException { + final ByteBuffer cellBlock, final int totalSize) throws IOException { // I confirmed toBytes does same as DataOutputStream#writeInt. dos.write(Bytes.toBytes(totalSize)); // This allocates a buffer that is the size of the message internally. @@ -278,9 +80,9 @@ public class IPCUtil { /** * @return Size on the wire when the two messages are written with writeDelimitedTo */ - public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { + public static int getTotalSizeWhenWrittenDelimited(Message... messages) { int totalSize = 0; - for (Message m: messages) { + for (Message m : messages) { if (m == null) { continue; } @@ -290,4 +92,52 @@ public class IPCUtil { Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); return totalSize; } + + /** + * @return True if the exception is a fatal connection exception. + */ + public static boolean isFatalConnectionException(final ExceptionResponse e) { + return e.getExceptionClassName().equals(FatalConnectionException.class.getName()); + } + + /** + * @param e exception to be wrapped + * @return RemoteException made from passed <code>e</code> + */ + public static RemoteException createRemoteException(final ExceptionResponse e) { + String innerExceptionClassName = e.getExceptionClassName(); + boolean doNotRetry = e.getDoNotRetry(); + return e.hasHostname() ? + // If a hostname then add it to the RemoteWithExtrasException + new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), + e.getPort(), doNotRetry) + : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); + } + + /** + * Takes an Exception and the address we were trying to connect to and return an IOException with + * the input exception as the cause. The new exception provides the stack trace of the place where + * the exception is thrown and some extra diagnostics information. If the exception is + * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return + * an IOException. + * @param addr target address + * @param exception the relevant exception + * @return an exception to throw + */ + public static IOException wrapException(InetSocketAddress addr, Exception exception) { + if (exception instanceof ConnectException) { + // connection refused; include the host:port in the error + return (ConnectException) new ConnectException( + "Call to " + addr + " failed on connection exception: " + exception).initCause(exception); + } else if (exception instanceof SocketTimeoutException) { + return (SocketTimeoutException) new SocketTimeoutException( + "Call to " + addr + " failed because " + exception).initCause(exception); + } else if (exception instanceof ConnectionClosingException) { + return (ConnectionClosingException) new ConnectionClosingException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); + } else { + return (IOException) new IOException( + "Call to " + addr + " failed on local exception: " + exception).initCause(exception); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 37b9afd..03b2953 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -896,7 +896,7 @@ public class RpcClientImpl extends AbstractRpcClient { } builder.setMethodName(call.md.getName()); builder.setRequestParam(call.param != null); - ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); + ByteBuffer cellBlock = cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); cellBlockBuilder.setLength(cellBlock.limit()); @@ -997,12 +997,12 @@ public class RpcClientImpl extends AbstractRpcClient { } if (responseHeader.hasException()) { ExceptionResponse exceptionResponse = responseHeader.getException(); - RemoteException re = createRemoteException(exceptionResponse); + RemoteException re = IPCUtil.createRemoteException(exceptionResponse); call.setException(re); call.callStats.setResponseSizeBytes(totalSize); call.callStats.setCallTimeMs( EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); - if (isFatalConnectionException(exceptionResponse)) { + if (IPCUtil.isFatalConnectionException(exceptionResponse)) { markClosed(re); } } else { @@ -1017,7 +1017,7 @@ public class RpcClientImpl extends AbstractRpcClient { int size = responseHeader.getCellBlockMeta().getLength(); byte [] cellBlock = new byte[size]; IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); - cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); + cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); } call.setResponse(value, cellBlockScanner); call.callStats.setResponseSizeBytes(totalSize); @@ -1044,29 +1044,6 @@ public class RpcClientImpl extends AbstractRpcClient { } } - /** - * @return True if the exception is a fatal connection exception. - */ - private boolean isFatalConnectionException(final ExceptionResponse e) { - return e.getExceptionClassName(). - equals(FatalConnectionException.class.getName()); - } - - /** - * @param e exception to be wrapped - * @return RemoteException made from passed <code>e</code> - */ - private RemoteException createRemoteException(final ExceptionResponse e) { - String innerExceptionClassName = e.getExceptionClassName(); - boolean doNotRetry = e.getDoNotRetry(); - return e.hasHostname()? - // If a hostname then add it to the RemoteWithExtrasException - new RemoteWithExtrasException(innerExceptionClassName, - e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry): - new RemoteWithExtrasException(innerExceptionClassName, - e.getStackTrace(), doNotRetry); - } - protected synchronized boolean markClosed(IOException e) { if (e == null){ throw new NullPointerException(); @@ -1322,7 +1299,7 @@ public class RpcClientImpl extends AbstractRpcClient { throw call.error; } // local exception - throw wrapException(addr, call.error); + throw IPCUtil.wrapException(addr, call.error); } return call; http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java new file mode 100644 index 0000000..b780b95 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.commons.lang.time.StopWatch; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.io.SizedCellScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestCellBlockBuilder { + + private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class); + + CellBlockBuilder builder; + + @Before + public void before() { + this.builder = new CellBlockBuilder(new Configuration()); + } + + @Test + public void testBuildCellBlock() throws IOException { + doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), null); + doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new DefaultCodec()); + doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new GzipCodec()); + } + + static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec, + final CompressionCodec compressor) throws IOException { + doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false); + } + + static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec, + final CompressionCodec compressor, final int count, final int size, final boolean sized) + throws IOException { + Cell[] cells = getCells(count, size); + CellScanner cellScanner = sized ? getSizedCellScanner(cells) + : CellUtil.createCellScanner(Arrays.asList(cells).iterator()); + ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); + cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb); + int i = 0; + while (cellScanner.advance()) { + i++; + } + assertEquals(count, i); + } + + static CellScanner getSizedCellScanner(final Cell[] cells) { + int size = -1; + for (Cell cell : cells) { + size += CellUtil.estimatedSerializedSizeOf(cell); + } + final int totalSize = ClassSize.align(size); + final CellScanner cellScanner = CellUtil.createCellScanner(cells); + return new SizedCellScanner() { + @Override + public long heapSize() { + return totalSize; + } + + @Override + public Cell current() { + return cellScanner.current(); + } + + @Override + public boolean advance() throws IOException { + return cellScanner.advance(); + } + }; + } + + static Cell[] getCells(final int howMany) { + return getCells(howMany, 1024); + } + + static Cell[] getCells(final int howMany, final int valueSize) { + Cell[] cells = new Cell[howMany]; + byte[] value = new byte[valueSize]; + for (int i = 0; i < howMany; i++) { + byte[] index = Bytes.toBytes(i); + KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value); + cells[i] = kv; + } + return cells; + } + + private static final String COUNT = "--count="; + private static final String SIZE = "--size="; + + /** + * Prints usage and then exits w/ passed <code>errCode</code> + * @param errCode + */ + private static void usage(final int errCode) { + System.out.println("Usage: IPCUtil [options]"); + System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing"); + System.out.println(" --count Count of Cells"); + System.out.println(" --size Size of Cell values"); + System.out.println("Example: IPCUtil --count=1024 --size=1024"); + System.exit(errCode); + } + + private static void timerTests(final CellBlockBuilder util, final int count, final int size, + final Codec codec, final CompressionCodec compressor) throws IOException { + final int cycles = 1000; + StopWatch timer = new StopWatch(); + timer.start(); + for (int i = 0; i < cycles; i++) { + timerTest(util, timer, count, size, codec, compressor, false); + } + timer.stop(); + LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + ", count=" + + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); + timer.reset(); + timer.start(); + for (int i = 0; i < cycles; i++) { + timerTest(util, timer, count, size, codec, compressor, true); + } + timer.stop(); + LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + ", count=" + + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); + } + + private static void timerTest(final CellBlockBuilder util, final StopWatch timer, final int count, + final int size, final Codec codec, final CompressionCodec compressor, final boolean sized) + throws IOException { + doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized); + } + + /** + * For running a few tests of methods herein. + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + int count = 1024; + int size = 10240; + for (String arg : args) { + if (arg.startsWith(COUNT)) { + count = Integer.parseInt(arg.replace(COUNT, "")); + } else if (arg.startsWith(SIZE)) { + size = Integer.parseInt(arg.replace(SIZE, "")); + } else { + usage(1); + } + } + CellBlockBuilder util = new CellBlockBuilder(HBaseConfiguration.create()); + ((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL); + timerTests(util, count, size, new KeyValueCodec(), null); + timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec()); + timerTests(util, count, size, new KeyValueCodec(), new GzipCodec()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index c90b275..ef534c0 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -17,181 +17,28 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; +import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; -import org.apache.commons.lang.time.StopWatch; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.codec.KeyValueCodec; -import org.apache.hadoop.hbase.io.SizedCellScanner; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.log4j.Level; -import org.junit.Before; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.junit.Test; -import org.junit.experimental.categories.Category; -@Category({ClientTests.class, SmallTests.class}) public class TestIPCUtil { - private static final Log LOG = LogFactory.getLog(TestIPCUtil.class); - - IPCUtil util; - @Before - public void before() { - this.util = new IPCUtil(new Configuration()); - } - @Test - public void testBuildCellBlock() throws IOException { - doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); - doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec()); - doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec()); - } - - static void doBuildCellBlockUndoCellBlock(final IPCUtil util, - final Codec codec, final CompressionCodec compressor) - throws IOException { - doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false); - } - - static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec, - final CompressionCodec compressor, final int count, final int size, final boolean sized) - throws IOException { - Cell [] cells = getCells(count, size); - CellScanner cellScanner = sized? getSizedCellScanner(cells): - CellUtil.createCellScanner(Arrays.asList(cells).iterator()); - ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb); - int i = 0; - while (cellScanner.advance()) { - i++; - } - assertEquals(count, i); - } - - static CellScanner getSizedCellScanner(final Cell [] cells) { - int size = -1; - for (Cell cell: cells) { - size += CellUtil.estimatedSerializedSizeOf(cell); - } - final int totalSize = ClassSize.align(size); - final CellScanner cellScanner = CellUtil.createCellScanner(cells); - return new SizedCellScanner() { - @Override - public long heapSize() { - return totalSize; - } - - @Override - public Cell current() { - return cellScanner.current(); - } - - @Override - public boolean advance() throws IOException { - return cellScanner.advance(); - } - }; - } - - static Cell [] getCells(final int howMany) { - return getCells(howMany, 1024); - } - - static Cell [] getCells(final int howMany, final int valueSize) { - Cell [] cells = new Cell[howMany]; - byte [] value = new byte[valueSize]; - for (int i = 0; i < howMany; i++) { - byte [] index = Bytes.toBytes(i); - KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value); - cells[i] = kv; - } - return cells; - } - - private static final String COUNT = "--count="; - private static final String SIZE = "--size="; - - /** - * Prints usage and then exits w/ passed <code>errCode</code> - * @param errCode - */ - private static void usage(final int errCode) { - System.out.println("Usage: IPCUtil [options]"); - System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing"); - System.out.println(" --count Count of Cells"); - System.out.println(" --size Size of Cell values"); - System.out.println("Example: IPCUtil --count=1024 --size=1024"); - System.exit(errCode); - } - - private static void timerTests(final IPCUtil util, final int count, final int size, - final Codec codec, final CompressionCodec compressor) - throws IOException { - final int cycles = 1000; - StopWatch timer = new StopWatch(); - timer.start(); - for (int i = 0; i < cycles; i++) { - timerTest(util, timer, count, size, codec, compressor, false); - } - timer.stop(); - LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + - ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); - timer.reset(); - timer.start(); - for (int i = 0; i < cycles; i++) { - timerTest(util, timer, count, size, codec, compressor, true); - } - timer.stop(); - LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + - ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms"); - } - - private static void timerTest(final IPCUtil util, final StopWatch timer, final int count, - final int size, final Codec codec, final CompressionCodec compressor, final boolean sized) - throws IOException { - doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized); - } - - /** - * For running a few tests of methods herein. - * @param args - * @throws IOException - */ - public static void main(String[] args) throws IOException { - int count = 1024; - int size = 10240; - for (String arg: args) { - if (arg.startsWith(COUNT)) { - count = Integer.parseInt(arg.replace(COUNT, "")); - } else if (arg.startsWith(SIZE)) { - size = Integer.parseInt(arg.replace(SIZE, "")); - } else { - usage(1); - } - } - IPCUtil util = new IPCUtil(HBaseConfiguration.create()); - ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL); - timerTests(util, count, size, new KeyValueCodec(), null); - timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec()); - timerTests(util, count, size, new KeyValueCodec(), new GzipCodec()); + public void testWrapException() throws Exception { + final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); + assertTrue(wrapException(address, new ConnectException()) instanceof ConnectException); + assertTrue( + wrapException(address, new SocketTimeoutException()) instanceof SocketTimeoutException); + assertTrue(wrapException(address, new ConnectionClosingException( + "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException); + assertTrue( + wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) + .getCause() instanceof CallTimeoutException); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 4b27924..759da82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -183,7 +183,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { */ static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; - private final IPCUtil ipcUtil; + private final CellBlockBuilder cellBlockBuilder; private static final String AUTH_FAILED_FOR = "Auth failed for "; private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; @@ -434,14 +434,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { List<ByteBuffer> cellBlock = null; int cellBlockSize = 0; if (reservoir != null) { - this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec, + this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec, this.connection.compressionCodec, cells, reservoir); if (this.cellBlockStream != null) { cellBlock = this.cellBlockStream.getByteBuffers(); cellBlockSize = this.cellBlockStream.size(); } } else { - ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec, + ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells); if (b != null) { cellBlockSize = b.remaining(); @@ -1861,7 +1861,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (header.hasCellBlockMeta()) { buf.position(offset); - cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf); + cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); @@ -2058,7 +2058,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); - this.ipcUtil = new IPCUtil(conf); + this.cellBlockBuilder = new CellBlockBuilder(conf); // Create the responder here http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 4cfa25c..be5ad56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -35,10 +35,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import java.io.IOException; -import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; @@ -54,7 +52,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -394,19 +391,4 @@ public abstract class AbstractTestIPC { rpcServer.stop(); } } - - @Test - public void testWrapException() throws Exception { - AbstractRpcClient client = - (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC"); - final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); - assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException); - assertTrue(client.wrapException(address, - new SocketTimeoutException()) instanceof SocketTimeoutException); - assertTrue(client.wrapException(address, new ConnectionClosingException( - "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException); - assertTrue(client - .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) - .getCause() instanceof CallTimeoutException); - } }
