HBASE-12295 Prevent block eviction under us if reads are in progress from the BBs (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ccb22bd8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ccb22bd8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ccb22bd8 Branch: refs/heads/master Commit: ccb22bd80dfae64ff27f660254afb224dce268f0 Parents: 3b6db26 Author: ramkrishna <[email protected]> Authored: Tue Jul 21 21:15:32 2015 +0530 Committer: ramkrishna <[email protected]> Committed: Tue Jul 21 21:15:32 2015 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hbase/ShareableMemory.java | 36 + .../io/encoding/BufferedDataBlockEncoder.java | 2 +- .../hadoop/hbase/io/hfile/HFileContext.java | 28 +- .../hbase/io/hfile/HFileContextBuilder.java | 9 +- .../hadoop/hbase/io/hfile/BlockCache.java | 13 + .../apache/hadoop/hbase/io/hfile/Cacheable.java | 15 + .../hbase/io/hfile/CacheableDeserializer.java | 5 +- .../hbase/io/hfile/CombinedBlockCache.java | 14 + .../hbase/io/hfile/CompoundBloomFilter.java | 14 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 9 +- .../hadoop/hbase/io/hfile/HFileBlock.java | 34 +- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 153 +- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 315 ++-- .../hadoop/hbase/io/hfile/LruBlockCache.java | 9 +- .../hbase/io/hfile/MemcachedBlockCache.java | 9 +- .../hbase/io/hfile/bucket/BucketCache.java | 119 +- .../io/hfile/bucket/ByteBufferIOEngine.java | 37 +- .../hbase/io/hfile/bucket/FileIOEngine.java | 27 +- .../hadoop/hbase/io/hfile/bucket/IOEngine.java | 21 +- .../hadoop/hbase/regionserver/HRegion.java | 123 +- .../hadoop/hbase/regionserver/KeyValueHeap.java | 7 +- .../hbase/regionserver/RSRpcServices.java | 124 +- .../regionserver/ReversedRegionScannerImpl.java | 4 +- .../hadoop/hbase/regionserver/StoreFile.java | 11 +- .../hadoop/hbase/regionserver/StoreScanner.java | 9 +- .../hadoop/hbase/HBaseTestingUtility.java | 28 +- .../client/TestBlockEvictionFromClient.java | 1441 ++++++++++++++++++ .../hadoop/hbase/io/hfile/CacheTestUtils.java | 7 +- .../hadoop/hbase/io/hfile/TestCacheConfig.java | 8 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 29 + .../hbase/io/hfile/TestCachedBlockQueue.java | 5 + .../apache/hadoop/hbase/io/hfile/TestHFile.java | 2 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 6 +- .../hbase/io/hfile/TestHFileBlockIndex.java | 4 + .../hbase/io/hfile/TestLruBlockCache.java | 5 + .../io/hfile/bucket/TestByteBufferIOEngine.java | 13 +- .../hbase/io/hfile/bucket/TestFileIOEngine.java | 7 +- .../regionserver/TestHeapMemoryManager.java | 11 +- .../TestScannerHeartbeatMessages.java | 16 +- 39 files changed, 2377 insertions(+), 352 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java new file mode 100644 index 0000000..f8b9127 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ShareableMemory.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A cell implementing this interface would mean that the memory area backing this cell will refer + * to a memory area that could be part of a larger common memory area used by the + * RegionServer. If an exclusive instance is required, use the {@link #cloneToCell()} to have the + * contents of the cell copied to an exclusive memory area. + */ [email protected] +public interface ShareableMemory { + /** + * Does a deep copy of the contents to a new memory area and + * returns it in the form of a cell. + * @return Cell the deep cloned cell + */ + public Cell cloneToCell(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 1a65223..a758b26 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -283,7 +283,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { /** * Copies only the key part of the keybuffer by doing a deep copy and passes the * seeker state members for taking a clone. - * Note that the value byte[] part is still pointing to the currentBuffer and the + * Note that the value byte[] part is still pointing to the currentBuffer and * represented by the valueOffset and valueLength */ // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 5edd47d..9945146 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -56,6 +56,7 @@ public class HFileContext implements HeapSize, Cloneable { /** Encryption algorithm and key used */ private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime; + private String hfileName; //Empty constructor. Go with setters public HFileContext() { @@ -77,12 +78,13 @@ public class HFileContext implements HeapSize, Cloneable { this.encoding = context.encoding; this.cryptoContext = context.cryptoContext; this.fileCreateTime = context.fileCreateTime; + this.hfileName = context.hfileName; } - public HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags, + HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags, Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int blockSize, DataBlockEncoding encoding, - Encryption.Context cryptoContext, long fileCreateTime) { + Encryption.Context cryptoContext, long fileCreateTime, String hfileName) { this.usesHBaseChecksum = useHBaseChecksum; this.includesMvcc = includesMvcc; this.includesTags = includesTags; @@ -96,6 +98,7 @@ public class HFileContext implements HeapSize, Cloneable { } this.cryptoContext = cryptoContext; this.fileCreateTime = fileCreateTime; + this.hfileName = hfileName; } /** @@ -119,10 +122,6 @@ public class HFileContext implements HeapSize, Cloneable { return compressAlgo; } - public void setCompression(Compression.Algorithm compressAlgo) { - this.compressAlgo = compressAlgo; - } - public boolean isUseHBaseChecksum() { return usesHBaseChecksum; } @@ -175,10 +174,6 @@ public class HFileContext implements HeapSize, Cloneable { return encoding; } - public void setDataBlockEncoding(DataBlockEncoding encoding) { - this.encoding = encoding; - } - public Encryption.Context getEncryptionContext() { return cryptoContext; } @@ -187,6 +182,10 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + public String getHFileName() { + return this.hfileName; + } + /** * HeapSize implementation * NOTE : The heapsize should be altered as and when new state variable are added @@ -196,11 +195,14 @@ public class HFileContext implements HeapSize, Cloneable { public long heapSize() { long size = ClassSize.align(ClassSize.OBJECT + // Algorithm reference, encodingon, checksumtype, Encryption.Context reference - 4 * ClassSize.REFERENCE + + 5 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + // usesHBaseChecksum, includesMvcc, includesTags and compressTags 4 * Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_LONG); + if (this.hfileName != null) { + size += ClassSize.STRING + this.hfileName.length(); + } return size; } @@ -227,6 +229,10 @@ public class HFileContext implements HeapSize, Cloneable { sb.append(" compressAlgo="); sb.append(compressAlgo); sb.append(" compressTags="); sb.append(compressTags); sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]"); + if (hfileName != null) { + sb.append(" name="); + sb.append(hfileName); + } sb.append(" ]"); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java index a903974..ce3541f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java @@ -53,6 +53,8 @@ public class HFileContextBuilder { private Encryption.Context cryptoContext = Encryption.Context.NONE; private long fileCreateTime = 0; + private String hfileName = null; + public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) { this.usesHBaseChecksum = useHBaseCheckSum; return this; @@ -108,9 +110,14 @@ public class HFileContextBuilder { return this; } + public HFileContextBuilder withHFileName(String name) { + this.hfileName = name; + return this; + } + public HFileContext build() { return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression, compressTags, checksumType, bytesPerChecksum, blocksize, encoding, cryptoContext, - fileCreateTime); + fileCreateTime, hfileName); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 57c4be9..cef7e02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import java.util.Iterator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; /** * Block cache interface. Anything that implements the {@link Cacheable} @@ -116,4 +117,16 @@ public interface BlockCache extends Iterable<CachedBlock> { * @return The list of sub blockcaches that make up this one; returns null if no sub caches. */ BlockCache [] getBlockCaches(); + + /** + * Called when the scanner using the block decides to return the block once its usage + * is over. + * This API should be called after the block is used, failing to do so may have adverse effects + * by preventing the blocks from being evicted because of which it will prevent new hot blocks + * from getting added to the block cache. The implementation of the BlockCache will decide + * on what to be done with the block based on the memory type of the block's {@link MemoryType}. + * @param cacheKey the cache key of the block + * @param block the hfileblock to be returned + */ + void returnBlock(BlockCacheKey cacheKey, Cacheable block); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java index f611c61..3c7a19e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java @@ -60,4 +60,19 @@ public interface Cacheable extends HeapSize { * @return the block type of this cached HFile block */ BlockType getBlockType(); + + /** + * @return the {@code MemoryType} of this Cacheable + */ + MemoryType getMemoryType(); + + /** + * SHARED means when this Cacheable is read back from cache it refers to the same memory area as + * used by the cache for caching it. + * EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an + * exclusive memory area of this Cacheable. + */ + public static enum MemoryType { + SHARED, EXCLUSIVE; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java index 26555ef..a385fd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; /** @@ -36,14 +37,14 @@ public interface CacheableDeserializer<T extends Cacheable> { T deserialize(ByteBuff b) throws IOException; /** - * * @param b * @param reuse true if Cacheable object can use the given buffer as its * content + * @param memType the {@link MemoryType} of the buffer * @return T the deserialized object. * @throws IOException */ - T deserialize(ByteBuff b, boolean reuse) throws IOException; + T deserialize(ByteBuff b, boolean reuse, MemoryType memType) throws IOException; /** * Get the identifier of this deserialiser. Identifier is unique for each http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 7725cf9..33b0d98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import com.google.common.annotations.VisibleForTesting; + /** * CombinedBlockCache is an abstraction layer that combines @@ -219,4 +221,16 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { public void setMaxSize(long size) { this.lruCache.setMaxSize(size); } + + @Override + public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { + // A noop + this.lruCache.returnBlock(cacheKey, block); + this.l2Cache.returnBlock(cacheKey, block); + } + + @VisibleForTesting + public int getRefCount(BlockCacheKey cacheKey) { + return ((BucketCache) this.l2Cache).getRefCount(cacheKey); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java index 6890884..9d39a03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java @@ -119,11 +119,15 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase "Failed to load Bloom block for key " + Bytes.toStringBinary(key, keyOffset, keyLength), ex); } - - ByteBuff bloomBuf = bloomBlock.getBufferReadOnly(); - result = BloomFilterUtil.contains(key, keyOffset, keyLength, - bloomBuf, bloomBlock.headerSize(), - bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount); + try { + ByteBuff bloomBuf = bloomBlock.getBufferReadOnly(); + result = + BloomFilterUtil.contains(key, keyOffset, keyLength, bloomBuf, bloomBlock.headerSize(), + bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount); + } finally { + // After the use return back the block if it was served from a cache. + reader.returnBlock(bloomBlock); + } } if (numQueriesPerChunk != null && block >= 0) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 71ac506..48636a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -374,6 +373,12 @@ public class HFile { final boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException; + + /** + * Return the given block back to the cache, if it was obtained from cache. + * @param block Block to be returned. + */ + void returnBlock(HFileBlock block); } /** An interface used by clients to open and iterate an {@link HFile}. */ @@ -389,7 +394,7 @@ public class HFile { HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction); - ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; + HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; Map<byte[], byte[]> loadFileInfo() throws IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 8672d62..7104267 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -121,7 +121,8 @@ public class HFileBlock implements Cacheable { static final CacheableDeserializer<Cacheable> blockDeserializer = new CacheableDeserializer<Cacheable>() { - public HFileBlock deserialize(ByteBuff buf, boolean reuse) throws IOException{ + public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType) + throws IOException { buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); ByteBuff newByteBuffer; if (reuse) { @@ -135,7 +136,7 @@ public class HFileBlock implements Cacheable { buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); boolean usesChecksum = buf.get() == (byte)1; - HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum); + HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum, memType); hFileBlock.offset = buf.getLong(); hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt(); if (hFileBlock.hasNextBlockHeader()) { @@ -152,7 +153,7 @@ public class HFileBlock implements Cacheable { @Override public HFileBlock deserialize(ByteBuff b) throws IOException { // Used only in tests - return deserialize(b, false); + return deserialize(b, false, MemoryType.EXCLUSIVE); } }; private static final int deserializerIdentifier; @@ -198,6 +199,8 @@ public class HFileBlock implements Cacheable { */ private int nextBlockOnDiskSizeWithHeader = -1; + private MemoryType memType = MemoryType.EXCLUSIVE; + /** * Creates a new {@link HFile} block from the given fields. This constructor * is mostly used when the block data has already been read and uncompressed, @@ -255,15 +258,24 @@ public class HFileBlock implements Cacheable { HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException { this(new SingleByteBuff(b), usesHBaseChecksum); } + /** * Creates a block from an existing buffer starting with a header. Rewinds * and takes ownership of the buffer. By definition of rewind, ignores the * buffer position, but if you slice the buffer beforehand, it will rewind - * to that point. The reason this has a minorNumber and not a majorNumber is - * because majorNumbers indicate the format of a HFile whereas minorNumbers - * indicate the format inside a HFileBlock. + * to that point. */ HFileBlock(ByteBuff b, boolean usesHBaseChecksum) throws IOException { + this(b, usesHBaseChecksum, MemoryType.EXCLUSIVE); + } + + /** + * Creates a block from an existing buffer starting with a header. Rewinds + * and takes ownership of the buffer. By definition of rewind, ignores the + * buffer position, but if you slice the buffer beforehand, it will rewind + * to that point. + */ + HFileBlock(ByteBuff b, boolean usesHBaseChecksum, MemoryType memType) throws IOException { b.rewind(); blockType = BlockType.read(b); onDiskSizeWithoutHeader = b.getInt(); @@ -282,6 +294,7 @@ public class HFileBlock implements Cacheable { HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM; } this.fileContext = contextBuilder.build(); + this.memType = memType; buf = b; buf.rewind(); } @@ -650,8 +663,8 @@ public class HFileBlock implements Cacheable { public long heapSize() { long size = ClassSize.align( ClassSize.OBJECT + - // Block type, multi byte buffer and meta references - 3 * ClassSize.REFERENCE + + // Block type, multi byte buffer, MemoryType and meta references + 4 * ClassSize.REFERENCE + // On-disk size, uncompressed size, and next block's on-disk size // bytePerChecksum and onDiskDataSize 4 * Bytes.SIZEOF_INT + @@ -1885,6 +1898,11 @@ public class HFileBlock implements Cacheable { return this.fileContext; } + @Override + public MemoryType getMemoryType() { + return this.memType; + } + /** * Convert the contents of the block header into a human readable string. * This is mostly helpful for debugging. This assumes that the block http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 30cf7ab..3494145 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.Cell; @@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader; @@ -295,78 +295,90 @@ public class HFileBlockIndex { int lookupLevel = 1; // How many levels deep we are in our lookup. int index = -1; - HFileBlock block; + HFileBlock block = null; + boolean dataBlock = false; KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue(); while (true) { - - if (currentBlock != null && currentBlock.getOffset() == currentOffset) - { - // Avoid reading the same block again, even with caching turned off. - // This is crucial for compaction-type workload which might have - // caching turned off. This is like a one-block cache inside the - // scanner. - block = currentBlock; - } else { - // Call HFile's caching block reader API. We always cache index - // blocks, otherwise we might get terrible performance. - boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel); - BlockType expectedBlockType; - if (lookupLevel < searchTreeLevel - 1) { - expectedBlockType = BlockType.INTERMEDIATE_INDEX; - } else if (lookupLevel == searchTreeLevel - 1) { - expectedBlockType = BlockType.LEAF_INDEX; + try { + if (currentBlock != null && currentBlock.getOffset() == currentOffset) { + // Avoid reading the same block again, even with caching turned off. + // This is crucial for compaction-type workload which might have + // caching turned off. This is like a one-block cache inside the + // scanner. + block = currentBlock; } else { - // this also accounts for ENCODED_DATA - expectedBlockType = BlockType.DATA; + // Call HFile's caching block reader API. We always cache index + // blocks, otherwise we might get terrible performance. + boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel); + BlockType expectedBlockType; + if (lookupLevel < searchTreeLevel - 1) { + expectedBlockType = BlockType.INTERMEDIATE_INDEX; + } else if (lookupLevel == searchTreeLevel - 1) { + expectedBlockType = BlockType.LEAF_INDEX; + } else { + // this also accounts for ENCODED_DATA + expectedBlockType = BlockType.DATA; + } + block = + cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread, + isCompaction, true, expectedBlockType, expectedDataBlockEncoding); } - block = cachingBlockReader.readBlock(currentOffset, - currentOnDiskSize, shouldCache, pread, isCompaction, true, - expectedBlockType, expectedDataBlockEncoding); - } - if (block == null) { - throw new IOException("Failed to read block at offset " + - currentOffset + ", onDiskSize=" + currentOnDiskSize); - } + if (block == null) { + throw new IOException("Failed to read block at offset " + currentOffset + + ", onDiskSize=" + currentOnDiskSize); + } - // Found a data block, break the loop and check our level in the tree. - if (block.getBlockType().isData()) { - break; - } + // Found a data block, break the loop and check our level in the tree. + if (block.getBlockType().isData()) { + dataBlock = true; + break; + } - // Not a data block. This must be a leaf-level or intermediate-level - // index block. We don't allow going deeper than searchTreeLevel. - if (++lookupLevel > searchTreeLevel) { - throw new IOException("Search Tree Level overflow: lookupLevel="+ - lookupLevel + ", searchTreeLevel=" + searchTreeLevel); - } + // Not a data block. This must be a leaf-level or intermediate-level + // index block. We don't allow going deeper than searchTreeLevel. + if (++lookupLevel > searchTreeLevel) { + throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel + + ", searchTreeLevel=" + searchTreeLevel); + } - // Locate the entry corresponding to the given key in the non-root - // (leaf or intermediate-level) index block. - ByteBuff buffer = block.getBufferWithoutHeader(); - index = locateNonRootIndexEntry(buffer, key, comparator); - if (index == -1) { - // This has to be changed - // For now change this to key value - KeyValue kv = KeyValueUtil.ensureKeyValue(key); - throw new IOException("The key " - + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength()) - + " is before the" + " first key of the non-root index block " - + block); - } + // Locate the entry corresponding to the given key in the non-root + // (leaf or intermediate-level) index block. + ByteBuff buffer = block.getBufferWithoutHeader(); + index = locateNonRootIndexEntry(buffer, key, comparator); + if (index == -1) { + // This has to be changed + // For now change this to key value + KeyValue kv = KeyValueUtil.ensureKeyValue(key); + throw new IOException("The key " + + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength()) + + " is before the" + " first key of the non-root index block " + block); + } - currentOffset = buffer.getLong(); - currentOnDiskSize = buffer.getInt(); + currentOffset = buffer.getLong(); + currentOnDiskSize = buffer.getInt(); - // Only update next indexed key if there is a next indexed key in the current level - byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1); - if (nonRootIndexedKey != null) { - tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length); - nextIndexedKey = tmpNextIndexKV; + // Only update next indexed key if there is a next indexed key in the current level + byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1); + if (nonRootIndexedKey != null) { + tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length); + nextIndexedKey = tmpNextIndexKV; + } + } finally { + if (!dataBlock) { + // Return the block immediately if it is not the + // data block + cachingBlockReader.returnBlock(block); + } } } if (lookupLevel != searchTreeLevel) { + assert dataBlock == true; + // Though we have retrieved a data block we have found an issue + // in the retrieved data block. Hence returned the block so that + // the ref count can be decremented + cachingBlockReader.returnBlock(block); throw new IOException("Reached a data block at level " + lookupLevel + " but the number of levels is " + searchTreeLevel); } @@ -396,16 +408,19 @@ public class HFileBlockIndex { HFileBlock midLeafBlock = cachingBlockReader.readBlock( midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true, BlockType.LEAF_INDEX, null); - - ByteBuff b = midLeafBlock.getBufferWithoutHeader(); - int numDataBlocks = b.getIntAfterPosition(0); - int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1)); - int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - - keyRelOffset; - int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset - + SECONDARY_INDEX_ENTRY_OVERHEAD; - byte[] bytes = b.toBytes(keyOffset, keyLen); - targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length); + try { + ByteBuff b = midLeafBlock.getBufferWithoutHeader(); + int numDataBlocks = b.getIntAfterPosition(0); + int keyRelOffset = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 1)); + int keyLen = b.getIntAfterPosition(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset; + int keyOffset = + Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset + + SECONDARY_INDEX_ENTRY_OVERHEAD; + byte[] bytes = b.toBytes(keyOffset, keyLen); + targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length); + } finally { + cachingBlockReader.returnBlock(midLeafBlock); + } } else { // The middle of the root-level index. targetMidKey = blockKeys[rootCount / 2]; http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4189320..4a11b14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -34,10 +34,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.SizeCachedKeyValue; -import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.security.EncryptionUtil; @@ -256,6 +258,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false, null, null); + // Need not update the current block. Ideally here the readBlock won't find the + // block in cache. We call this readBlock so that block data is read from FS and + // cached in BC. So there is no reference count increment that happens here. + // The return will ideally be a noop because the block is not of MemoryType SHARED. + returnBlock(block); prevBlock = block; offset += block.getOnDiskSizeWithHeader(); } @@ -337,6 +344,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return fileSize; } + @Override + public void returnBlock(HFileBlock block) { + BlockCache blockCache = this.cacheConf.getBlockCache(); + if (blockCache != null && block != null) { + BlockCacheKey cacheKey = new BlockCacheKey(this.getFileContext().getHFileName(), + block.getOffset()); + blockCache.returnBlock(cacheKey, block); + } + } /** * @return the first key in the file. May be null if file has no entries. Note * that this is not the first row key, but rather the byte form of the @@ -449,7 +465,6 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { protected final HFile.Reader reader; private int currTagsLen; private KeyValue.KeyOnlyKeyValue keyOnlyKv = new KeyValue.KeyOnlyKeyValue(); - protected HFileBlock block; // A pair for reusing in blockSeek() so that we don't garbage lot of objects final Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>(); @@ -461,6 +476,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * If the nextIndexedKey is null, it means the nextIndexedKey has not been loaded yet. */ protected Cell nextIndexedKey; + // Current block being used + protected HFileBlock curBlock; + // Previous blocks that were used in the course of the read + protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<HFileBlock>(); public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks, final boolean pread, final boolean isCompaction) { @@ -470,6 +489,41 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { this.isCompaction = isCompaction; } + void updateCurrBlockRef(HFileBlock block) { + if (block != null && this.curBlock != null && + block.getOffset() == this.curBlock.getOffset()) { + return; + } + if (this.curBlock != null) { + prevBlocks.add(this.curBlock); + } + this.curBlock = block; + } + + void reset() { + if (this.curBlock != null) { + this.prevBlocks.add(this.curBlock); + } + this.curBlock = null; + } + + private void returnBlockToCache(HFileBlock block) { + if (LOG.isTraceEnabled()) { + LOG.trace("Returning the block : " + block); + } + this.reader.returnBlock(block); + } + + private void returnBlocks(boolean returnAll) { + for (int i = 0; i < this.prevBlocks.size(); i++) { + returnBlockToCache(this.prevBlocks.get(i)); + } + this.prevBlocks.clear(); + if (returnAll && this.curBlock != null) { + returnBlockToCache(this.curBlock); + this.curBlock = null; + } + } @Override public boolean isSeeked(){ return blockBuffer != null; @@ -498,6 +552,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return kvBufSize; } + @Override + public void close() { + this.returnBlocks(true); + } + protected int getNextCellStartPosition() { int nextKvPos = blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen + currMemstoreTSLen; @@ -536,7 +595,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { private final void checkTagsLen() { if (checkLen(this.currTagsLen)) { throw new IllegalStateException("Invalid currTagsLen " + this.currTagsLen + - ". Block offset: " + block.getOffset() + ", block length: " + this.blockBuffer.limit() + + ". Block offset: " + curBlock.getOffset() + ", block length: " + + this.blockBuffer.limit() + ", position: " + this.blockBuffer.position() + " (without header)."); } } @@ -610,7 +670,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { || vlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid klen " + klen + " or vlen " + vlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } offsetFromPos += Bytes.SIZEOF_LONG; @@ -626,7 +686,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { ^ (blockBuffer.getByteAfterPosition(offsetFromPos + 1) & 0xff); if (tlen < 0 || tlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " - + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } // add the two bytes read for the tags. @@ -641,8 +701,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { if (lastKeyValueSize < 0) { throw new IllegalStateException("blockSeek with seekBefore " + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key) - + ", blockOffset=" + block.getOffset() + ", onDiskSize=" - + block.getOnDiskSizeWithHeader()); + + ", blockOffset=" + curBlock.getOffset() + ", onDiskSize=" + + curBlock.getOnDiskSizeWithHeader()); } blockBuffer.moveBack(lastKeyValueSize); readKeyValueLen(); @@ -709,8 +769,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // smaller than // the next indexed key or the current data block is the last data // block. - return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false); + return loadBlockAndSeekToKey(this.curBlock, nextIndexedKey, false, key, + false); } + } } // Don't rewind on a reseek operation, because reseek implies that we are @@ -734,10 +796,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ public int seekTo(Cell key, boolean rewind) throws IOException { HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader(); - BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block, + BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, curBlock, cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding()); if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) { - // This happens if the key e.g. falls before the beginning of the file. + // This happens if the key e.g. falls before the beginning of the + // file. return -1; } return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(), @@ -746,7 +809,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean seekBefore(Cell key) throws IOException { - HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block, + HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, curBlock, cacheBlocks, pread, isCompaction, reader.getEffectiveEncodingInCache(isCompaction)); if (seekToBlock == null) { return false; @@ -761,6 +824,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return false; } + // The first key in the current block 'seekToBlock' is greater than the given + // seekBefore key. We will go ahead by reading the next block that satisfies the + // given key. Return the current block before reading the next one. + reader.returnBlock(seekToBlock); // It is important that we compute and pass onDiskSize to the block // reader so that it does not have to read the header separately to // figure out the size. @@ -783,28 +850,33 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ protected HFileBlock readNextDataBlock() throws IOException { long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); - if (block == null) + if (curBlock == null) return null; - HFileBlock curBlock = block; + HFileBlock block = this.curBlock; do { - if (curBlock.getOffset() >= lastDataBlockOffset) + if (block.getOffset() >= lastDataBlockOffset) return null; - if (curBlock.getOffset() < 0) { + if (block.getOffset() < 0) { throw new IOException("Invalid block file offset: " + block); } // We are reading the next block without block type validation, because // it might turn out to be a non-data block. - curBlock = reader.readBlock(curBlock.getOffset() - + curBlock.getOnDiskSizeWithHeader(), - curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, + block = reader.readBlock(block.getOffset() + + block.getOnDiskSizeWithHeader(), + block.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread, isCompaction, true, null, getEffectiveDataBlockEncoding()); - } while (!curBlock.getBlockType().isData()); + if (block != null && !block.getBlockType().isData()) { + // Whatever block we read we will be returning it unless + // it is a datablock. Just in case the blocks are non data blocks + reader.returnBlock(block); + } + } while (!block.getBlockType().isData()); - return curBlock; + return block; } public DataBlockEncoding getEffectiveDataBlockEncoding() { @@ -817,13 +889,27 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return null; KeyValue ret; + // TODO : reduce the varieties of KV here. Check if based on a boolean + // we can handle the 'no tags' case + // TODO : Handle MBB here if (currTagsLen > 0) { - ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), getCellBufSize()); + if (this.curBlock.getMemoryType() == MemoryType.SHARED) { + ret = new ShareableMemoryKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + } else { + ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + } } else { - ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + if (this.curBlock.getMemoryType() == MemoryType.SHARED) { + ret = new ShareableMemoryNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + + blockBuffer.position(), getCellBufSize()); + } else { + ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position(), getCellBufSize()); + } } + if (this.reader.shouldIncludeMemstoreTS()) { ret.setSequenceId(currMemstoreTS); } @@ -838,6 +924,32 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + KEY_VALUE_LEN_SIZE, currKeyLen); } + private static class ShareableMemoryKeyValue extends SizeCachedKeyValue implements + ShareableMemory { + public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) { + super(bytes, offset, length); + } + + @Override + public Cell cloneToCell() { + byte[] copy = Bytes.copy(this.bytes, this.offset, this.length); + return new SizeCachedKeyValue(copy, 0, copy.length); + } + } + + private static class ShareableMemoryNoTagsKeyValue extends SizeCachedNoTagsKeyValue implements + ShareableMemory { + public ShareableMemoryNoTagsKeyValue(byte[] bytes, int offset, int length) { + super(bytes, offset, length); + } + + @Override + public Cell cloneToCell() { + byte[] copy = Bytes.copy(this.bytes, this.offset, this.length); + return new SizeCachedNoTagsKeyValue(copy, 0, copy.length); + } + } + @Override public ByteBuffer getValue() { assertSeeked(); @@ -849,7 +961,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } protected void setNonSeekedState() { - block = null; + reset(); blockBuffer = null; currKeyLen = 0; currValueLen = 0; @@ -869,7 +981,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + "; currKeyLen = " + currKeyLen + "; currValLen = " + currValueLen + "; block limit = " + blockBuffer.limit() + "; HFile name = " + reader.getName() - + "; currBlock currBlockOffset = " + block.getOffset()); + + "; currBlock currBlockOffset = " + this.curBlock.getOffset()); throw e; } } @@ -882,7 +994,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { private boolean positionForNextBlock() throws IOException { // Methods are small so they get inlined because they are 'hot'. long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset(); - if (block.getOffset() >= lastDataBlockOffset) { + if (this.curBlock.getOffset() >= lastDataBlockOffset) { setNonSeekedState(); return false; } @@ -897,7 +1009,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { setNonSeekedState(); return false; } - updateCurrBlock(nextBlock); + updateCurrentBlock(nextBlock); return true; } @@ -946,27 +1058,37 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return false; } - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - blockBuffer.rewind(); - readKeyValueLen(); - return true; + long firstDataBlockOffset = reader.getTrailer().getFirstDataBlockOffset(); + if (curBlock != null + && curBlock.getOffset() == firstDataBlockOffset) { + return processFirstDataBlock(); } - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, + readAndUpdateNewBlock(firstDataBlockOffset); + return true; + } + + protected boolean processFirstDataBlock() throws IOException{ + blockBuffer.rewind(); + readKeyValueLen(); + return true; + } + + protected void readAndUpdateNewBlock(long firstDataBlockOffset) throws IOException, + CorruptHFileException { + HFileBlock newBlock = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); + if (newBlock.getOffset() < 0) { + throw new IOException("Invalid block offset: " + newBlock.getOffset()); } - updateCurrBlock(block); - return true; + updateCurrentBlock(newBlock); } protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { - updateCurrBlock(seekToBlock); + if (this.curBlock == null + || this.curBlock.getOffset() != seekToBlock.getOffset()) { + updateCurrentBlock(seekToBlock); } else if (rewind) { blockBuffer.rewind(); } @@ -989,10 +1111,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ protected final void checkKeyValueLen() { if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) { - throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen + - " or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() + - ", block length: " + this.blockBuffer.limit() + ", position: " + - this.blockBuffer.position() + " (without header)."); + throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen + + " or currValueLen " + this.currValueLen + ". Block offset: " + + this.curBlock.getOffset() + ", block length: " + + this.blockBuffer.limit() + ", position: " + this.blockBuffer.position() + + " (without header)."); } } @@ -1002,19 +1125,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * * @param newBlock the block to make current */ - protected void updateCurrBlock(HFileBlock newBlock) { - block = newBlock; - + protected void updateCurrentBlock(HFileBlock newBlock) throws IOException { + // Set the active block on the reader // sanity check - if (block.getBlockType() != BlockType.DATA) { - throw new IllegalStateException("Scanner works only on data " + - "blocks, got " + block.getBlockType() + "; " + - "fileName=" + reader.getName() + ", " + - "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + - "isCompaction=" + isCompaction); + if (newBlock.getBlockType() != BlockType.DATA) { + throw new IllegalStateException("ScannerV2 works only on data " + "blocks, got " + + newBlock.getBlockType() + "; " + "fileName=" + reader.getName() + + ", " + "dataBlockEncoder=" + reader.getDataBlockEncoding() + ", " + "isCompaction=" + + isCompaction); } - blockBuffer = block.getBufferWithoutHeader(); + updateCurrBlockRef(newBlock); + blockBuffer = newBlock.getBufferWithoutHeader(); readKeyValueLen(); blockFetches++; @@ -1058,13 +1180,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } @Override - public void close() { - // HBASE-12295 will add code here. - } - - @Override public void shipped() throws IOException { - // HBASE-12295 will add code here. + this.returnBlocks(false); } } @@ -1127,8 +1244,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { updateCacheMetrics); if (cachedBlock != null) { if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { - cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); - } + HFileBlock compressedBlock = cachedBlock; + cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); + // In case of compressed block after unpacking we can return the compressed block + if (compressedBlock != cachedBlock) { + cache.returnBlock(cacheKey, compressedBlock); + } + } validateBlockType(cachedBlock, expectedBlockType); if (expectedDataBlockEncoding == null) { @@ -1163,6 +1285,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { " because of a data block encoding mismatch" + "; expected: " + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding); + // This is an error scenario. so here we need to decrement the + // count. + cache.returnBlock(cacheKey, cachedBlock); cache.evictBlock(cacheKey); } return null; @@ -1180,7 +1305,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @throws IOException */ @Override - public ByteBuff getMetaBlock(String metaBlockName, boolean cacheBlock) + public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException { if (trailer.getMetaIndexCount() == 0) { return null; // there are no meta blocks @@ -1213,7 +1338,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner - return cachedBlock.getBufferWithoutHeader(); + return cachedBlock; } // Cache Miss, please load. } @@ -1227,7 +1352,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); } - return metaBlock.getBufferWithoutHeader(); + return metaBlock; } } @@ -1424,7 +1549,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean isSeeked(){ - return this.block != null; + return curBlock != null; + } + + public void setNonSeekedState() { + reset(); } /** @@ -1434,21 +1563,21 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @param newBlock the block to make current * @throws CorruptHFileException */ - private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { - block = newBlock; + @Override + protected void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException { // sanity checks - if (block.getBlockType() != BlockType.ENCODED_DATA) { - throw new IllegalStateException( - "EncodedScanner works only on encoded data blocks"); + if (newBlock.getBlockType() != BlockType.ENCODED_DATA) { + throw new IllegalStateException("EncodedScanner works only on encoded data blocks"); } - short dataBlockEncoderId = block.getDataBlockEncodingId(); + short dataBlockEncoderId = newBlock.getDataBlockEncodingId(); if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { String encoderCls = dataBlockEncoder.getClass().getName(); throw new CorruptHFileException("Encoder " + encoderCls + " doesn't support data block encoding " + DataBlockEncoding.getNameFromId(dataBlockEncoderId)); } + updateCurrBlockRef(newBlock); ByteBuff encodedBuffer = getEncodedBuffer(newBlock); seeker.setCurrentBuffer(encodedBuffer); blockFetches++; @@ -1467,29 +1596,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } @Override - public boolean seekTo() throws IOException { - if (reader == null) { - return false; - } - - if (reader.getTrailer().getEntryCount() == 0) { - // No data blocks. - return false; - } - - long firstDataBlockOffset = - reader.getTrailer().getFirstDataBlockOffset(); - if (block != null && block.getOffset() == firstDataBlockOffset) { - seeker.rewind(); - return true; - } - - block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread, - isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); - if (block.getOffset() < 0) { - throw new IOException("Invalid block offset: " + block.getOffset()); - } - updateCurrentBlock(block); + protected boolean processFirstDataBlock() throws IOException { + seeker.rewind(); return true; } @@ -1497,10 +1605,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { public boolean next() throws IOException { boolean isValid = seeker.next(); if (!isValid) { - block = readNextDataBlock(); - isValid = block != null; + HFileBlock newBlock = readNextDataBlock(); + isValid = newBlock != null; if (isValid) { - updateCurrentBlock(block); + updateCurrentBlock(newBlock); + } else { + setNonSeekedState(); } } return isValid; @@ -1520,7 +1630,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public Cell getCell() { - if (block == null) { + if (this.curBlock == null) { return null; } return seeker.getCell(); @@ -1539,7 +1649,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } private void assertValidSeek() { - if (block == null) { + if (this.curBlock == null) { throw new NotSeekedException(); } } @@ -1548,9 +1658,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { return dataBlockEncoder.getFirstKeyCellInBlock(getEncodedBuffer(curBlock)); } + @Override protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind, Cell key, boolean seekBefore) throws IOException { - if (block == null || block.getOffset() != seekToBlock.getOffset()) { + if (this.curBlock == null + || this.curBlock.getOffset() != seekToBlock.getOffset()) { updateCurrentBlock(seekToBlock); } else if (rewind) { seeker.rewind(); @@ -1631,6 +1743,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileContextBuilder builder = new HFileContextBuilder() .withIncludesMvcc(shouldIncludeMemstoreTS()) .withHBaseCheckSum(true) + .withHFileName(this.getName()) .withCompression(this.compressAlgo); // Check for any key material available http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 806ddc9..dfbdc05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -34,11 +34,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; @@ -49,6 +48,7 @@ import org.apache.hadoop.util.StringUtils; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -1090,4 +1090,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } + + @Override + public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { + // There is no SHARED type here. Just return + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index f12f3b4..1eb7bfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.Addressing; @@ -259,7 +260,8 @@ public class MemcachedBlockCache implements BlockCache { public HFileBlock decode(CachedData d) { try { ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData())); - return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true); + return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true, + MemoryType.EXCLUSIVE); } catch (IOException e) { LOG.warn("Error deserializing data from memcached",e); } @@ -272,4 +274,9 @@ public class MemcachedBlockCache implements BlockCache { } } + @Override + public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { + // Not doing reference counting. All blocks here are EXCLUSIVE + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index f05a255..1c331a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -58,16 +59,17 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -421,20 +423,19 @@ public class BucketCache implements BlockCache, HeapSize { // TODO : change this area - should be removed after server cells and // 12295 are available int len = bucketEntry.getLength(); - ByteBuffer buf = ByteBuffer.allocate(len); - int lenRead = ioEngine.read(buf, bucketEntry.offset()); - ByteBuff bb = new SingleByteBuff(buf); - if (lenRead != len) { - throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); - } + Pair<ByteBuff, MemoryType> pair = ioEngine.read(bucketEntry.offset(), len); + ByteBuff bb = pair.getFirst(); CacheableDeserializer<Cacheable> deserializer = bucketEntry.deserializerReference(this.deserialiserMap); - Cacheable cachedBlock = deserializer.deserialize(bb, true); + Cacheable cachedBlock = deserializer.deserialize(bb, true, pair.getSecond()); long timeTaken = System.nanoTime() - start; if (updateCacheMetrics) { cacheStats.hit(caching); cacheStats.ioHit(timeTaken); } + if (pair.getSecond() == MemoryType.SHARED) { + bucketEntry.refCount.incrementAndGet(); + } bucketEntry.access(accessCount.incrementAndGet()); if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; @@ -468,14 +469,59 @@ public class BucketCache implements BlockCache, HeapSize { @Override public boolean evictBlock(BlockCacheKey cacheKey) { + return evictBlock(cacheKey, true); + } + + // does not check for the ref count. Just tries to evict it if found in the + // bucket map + private boolean forceEvict(BlockCacheKey cacheKey) { if (!cacheEnabled) { return false; } + RAMQueueEntry removedBlock = checkRamCache(cacheKey); + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry == null) { + if (removedBlock != null) { + cacheStats.evicted(0); + return true; + } else { + return false; + } + } + IdLock.Entry lockEntry = null; + try { + lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); + if (backingMap.remove(cacheKey, bucketEntry)) { + blockEvicted(cacheKey, bucketEntry, removedBlock == null); + } else { + return false; + } + } catch (IOException ie) { + LOG.warn("Failed evicting block " + cacheKey); + return false; + } finally { + if (lockEntry != null) { + offsetLock.releaseLockEntry(lockEntry); + } + } + cacheStats.evicted(bucketEntry.getCachedTime()); + return true; + } + + private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { RAMQueueEntry removedBlock = ramCache.remove(cacheKey); if (removedBlock != null) { this.blockNumber.decrementAndGet(); this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); } + return removedBlock; + } + + public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { + if (!cacheEnabled) { + return false; + } + RAMQueueEntry removedBlock = checkRamCache(cacheKey); BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry == null) { if (removedBlock != null) { @@ -488,10 +534,28 @@ public class BucketCache implements BlockCache, HeapSize { IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset()); - if (backingMap.remove(cacheKey, bucketEntry)) { - blockEvicted(cacheKey, bucketEntry, removedBlock == null); + int refCount = bucketEntry.refCount.get(); + if(refCount == 0) { + if (backingMap.remove(cacheKey, bucketEntry)) { + blockEvicted(cacheKey, bucketEntry, removedBlock == null); + } else { + return false; + } } else { - return false; + if(!deletedBlock) { + if (LOG.isDebugEnabled()) { + LOG.debug("This block " + cacheKey + " is still referred by " + refCount + + " readers. Can not be freed now"); + } + return false; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("This block " + cacheKey + " is still referred by " + refCount + + " readers. Can not be freed now. Hence will mark this" + + " for evicting at a later point"); + } + bucketEntry.markedForEvict = true; + } } } catch (IOException ie) { LOG.warn("Failed evicting block " + cacheKey); @@ -1107,6 +1171,10 @@ public class BucketCache implements BlockCache, HeapSize { byte deserialiserIndex; private volatile long accessCounter; private BlockPriority priority; + // Set this when we were not able to forcefully evict the block + private volatile boolean markedForEvict; + private AtomicInteger refCount = new AtomicInteger(0); + /** * Time this block was cached. Presumes we are created just before we are added to the cache. */ @@ -1198,9 +1266,12 @@ public class BucketCache implements BlockCache, HeapSize { public long free(long toFree) { Map.Entry<BlockCacheKey, BucketEntry> entry; long freedBytes = 0; + // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free + // What to do then? Caching attempt fail? Need some changes in cacheBlock API? while ((entry = queue.pollLast()) != null) { - evictBlock(entry.getKey()); - freedBytes += entry.getValue().getLength(); + if (evictBlock(entry.getKey(), false)) { + freedBytes += entry.getValue().getLength(); + } if (freedBytes >= toFree) { return freedBytes; } @@ -1404,4 +1475,26 @@ public class BucketCache implements BlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } + + @Override + public void returnBlock(BlockCacheKey cacheKey, Cacheable block) { + if (block.getMemoryType() == MemoryType.SHARED) { + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry != null) { + int refCount = bucketEntry.refCount.decrementAndGet(); + if (bucketEntry.markedForEvict && refCount == 0) { + forceEvict(cacheKey); + } + } + } + } + + @VisibleForTesting + public int getRefCount(BlockCacheKey cacheKey) { + BucketEntry bucketEntry = backingMap.get(cacheKey); + if (bucketEntry != null) { + return bucketEntry.refCount.get(); + } + return 0; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 03c65de..092234b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -22,8 +22,11 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ByteBufferArray; +import org.apache.hadoop.hbase.util.Pair; /** * IO engine that stores data in memory using an array of ByteBuffers @@ -64,24 +67,24 @@ public class ByteBufferIOEngine implements IOEngine { return false; } - /** - * Transfers data from the buffer array to the given byte buffer - * @param dstBuffer the given byte buffer into which bytes are to be written - * @param offset The offset in the ByteBufferArray of the first byte to be - * read - * @return number of bytes read - * @throws IOException - */ - @Override - public int read(ByteBuffer dstBuffer, long offset) throws IOException { - assert dstBuffer.hasArray(); - return bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(), - dstBuffer.arrayOffset()); - } - @Override - public ByteBuff read(long offset, int len) throws IOException { - return bufferArray.asSubByteBuff(offset, len); + public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException { + // TODO : this allocate and copy will go away once we create BB backed cells + ByteBuffer dstBuffer = ByteBuffer.allocate(length); + bufferArray.getMultiple(offset, dstBuffer.remaining(), dstBuffer.array(), + dstBuffer.arrayOffset()); + // Here the buffer that is created directly refers to the buffer in the actual buckets. + // When any cell is referring to the blocks created out of these buckets then it means that + // those cells are referring to a shared memory area which if evicted by the BucketCache would + // lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY + // so that the readers using this block are aware of this fact and do the necessary action + // to prevent eviction till the results are either consumed or copied + if (dstBuffer.limit() != length) { + throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length + + " expected"); + } + // TODO : to be removed - make it conditional + return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.SHARED); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index b1960c4..db589ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -26,8 +26,10 @@ import java.nio.channels.FileChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; /** @@ -81,14 +83,25 @@ public class FileIOEngine implements IOEngine { /** * Transfers data from file to the given byte buffer - * @param dstBuffer the given byte buffer into which bytes are to be written * @param offset The offset in the file where the first byte to be read + * @param length The length of buffer that should be allocated for reading + * from the file channel * @return number of bytes read * @throws IOException */ @Override - public int read(ByteBuffer dstBuffer, long offset) throws IOException { - return fileChannel.read(dstBuffer, offset); + public Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException { + ByteBuffer dstBuffer = ByteBuffer.allocate(length); + fileChannel.read(dstBuffer, offset); + // The buffer created out of the fileChannel is formed by copying the data from the file + // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts + // this buffer from the file the data is already copied and there is no need to ensure that + // the results are not corrupted before consuming them. + if (dstBuffer.limit() != length) { + throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length + + " expected"); + } + return new Pair<ByteBuff, MemoryType>(new SingleByteBuff(dstBuffer), MemoryType.EXCLUSIVE); } /** @@ -129,14 +142,6 @@ public class FileIOEngine implements IOEngine { } @Override - public ByteBuff read(long offset, int len) throws IOException { - ByteBuffer dstBuffer = ByteBuffer.allocate(len); - int read = read(dstBuffer, offset); - dstBuffer.limit(read); - return new SingleByteBuff(dstBuffer); - } - - @Override public void write(ByteBuff srcBuffer, long offset) throws IOException { // When caching block into BucketCache there will be single buffer backing for this HFileBlock. assert srcBuffer.hasArray(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index 862042f..3efb41b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.Pair; /** * A class implementing IOEngine interface supports data services for @@ -36,25 +38,14 @@ public interface IOEngine { boolean isPersistent(); /** - * Transfers data from IOEngine to the given byte buffer - * @param dstBuffer the given byte buffer into which bytes are to be written + * Transfers data from IOEngine to a byte buffer + * @param length How many bytes to be read from the offset * @param offset The offset in the IO engine where the first byte to be read - * @return number of bytes read + * @return Pair of ByteBuffer where data is read and its MemoryType ({@link MemoryType}) * @throws IOException * @throws RuntimeException when the length of the ByteBuff read is less than 'len' */ - int read(ByteBuffer dstBuffer, long offset) throws IOException; - - /** - * Transfers data from IOEngine at the given offset to an MultiByteBuffer - * @param offset the offset from which the underlying buckets should be read - * @param len the length upto which the buckets should be read - * @return the MultiByteBuffer formed from the underlying ByteBuffers forming the - * buckets - * @throws IOException - * @throws RuntimeException when the length of the ByteBuff read is less than 'len' - */ - ByteBuff read(long offset, int len) throws IOException; + Pair<ByteBuff, MemoryType> read(long offset, int length) throws IOException; /** * Transfers data from the given byte buffer to IOEngine
