Repository: hive Updated Branches: refs/heads/master 31077be9b -> 4df092674
HIVE-17613 : remove object pools for short, same-thread allocations (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4df09267 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4df09267 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4df09267 Branch: refs/heads/master Commit: 4df09267441ef6e65108f69f9ac6b3ba18768ab2 Parents: 31077be Author: sergey <[email protected]> Authored: Wed Oct 4 14:56:33 2017 -0700 Committer: sergey <[email protected]> Committed: Wed Oct 4 14:57:46 2017 -0700 ---------------------------------------------------------------------- .../llap/io/encoded/SerDeEncodedDataReader.java | 15 +---- .../hive/llap/cache/TestLowLevelCacheImpl.java | 4 +- .../hadoop/hive/llap/LlapCacheAwareFs.java | 4 +- .../hive/ql/io/orc/encoded/CacheChunk.java | 12 +--- .../ql/io/orc/encoded/EncodedReaderImpl.java | 69 ++++---------------- 5 files changed, 16 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 943ac6e..599b519 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -129,23 +129,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> t.reset(); } }); - public static final FixedSizedObjectPool<CacheChunk> TCC_POOL = - new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() { - @Override - public CacheChunk create() { - return new CacheChunk(); - } - @Override - public void resetBeforeOffer(CacheChunk t) { - t.reset(); - } - }); private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() { @Override public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) { - CacheChunk tcc = TCC_POOL.take(); - tcc.init(buffer, offset, end); - return tcc; + return new CacheChunk(buffer, offset, end); } }; http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java index ab10285..3320351 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java @@ -48,9 +48,7 @@ public class TestLowLevelCacheImpl { private static final DiskRangeListFactory testFactory = new DiskRangeListFactory() { public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) { - CacheChunk cc = new CacheChunk(); - cc.init(buffer, offset, end); - return cc; + return new CacheChunk(buffer, offset, end); } }; http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java index 5c1eed3..626aeb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java @@ -204,9 +204,7 @@ public class LlapCacheAwareFs extends FileSystem { @Override public DiskRangeList createCacheChunk( MemoryBuffer buffer, long startOffset, long endOffset) { - CacheChunk result = new CacheChunk(); // TODO: pool? - result.init(buffer, startOffset, endOffset); - return result; + return new CacheChunk(buffer, startOffset, endOffset); } }, gotAllData); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java index 16fdbf7..4eedca1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java @@ -35,15 +35,11 @@ import com.google.common.annotations.VisibleForTesting; public class CacheChunk extends DiskRangeList { protected MemoryBuffer buffer; - public CacheChunk() { - super(-1, -1); - } - - public void init(MemoryBuffer buffer, long offset, long end) { + public CacheChunk(MemoryBuffer buffer, long offset, long end) { + super(offset, end); this.buffer = buffer; this.offset = offset; this.end = end; - this.next = this.prev = null; // Just in case. } @Override @@ -81,10 +77,6 @@ public class CacheChunk extends DiskRangeList { throw new UnsupportedOperationException(); } - public void reset() { - init(null, -1, -1); - } - public void adjustEnd(long l) { this.end += l; } http://git-wip-us.apache.org/repos/asf/hive/blob/4df09267/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 31d5dd3..80b7be8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -112,19 +112,15 @@ class EncodedReaderImpl implements EncodedReader { private static final Object POOLS_CREATION_LOCK = new Object(); private static Pools POOLS; private static class Pools { - Pool<CacheChunk> tccPool; - Pool<ProcCacheChunk> pccPool; Pool<OrcEncodedColumnBatch> ecbPool; Pool<ColumnStreamData> csdPool; } private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() { - @Override - public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) { - CacheChunk tcc = POOLS.tccPool.take(); - tcc.init(buffer, offset, end); - return tcc; - } - }; + @Override + public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) { + return new CacheChunk(buffer, offset, end); + } + }; private final Object fileKey; private final DataReader dataReader; private boolean isDataReaderOpen = false; @@ -517,7 +513,6 @@ class EncodedReaderImpl implements EncodedReader { LOG.error("Error during the cleanup after another error; ignoring", t); } } - releaseCacheChunksIntoObjectPool(toRead.next); } private static int countMaxStreams(Area area) { @@ -677,8 +672,7 @@ class EncodedReaderImpl implements EncodedReader { private int count; public UncompressedCacheChunk(BufferChunk bc) { - super(); - init(null, bc.getOffset(), bc.getEnd()); + super(null, bc.getOffset(), bc.getEnd()); chunk = bc; count = 1; } @@ -720,21 +714,15 @@ class EncodedReaderImpl implements EncodedReader { * the DiskRange list created for the request, and everyone treats it like regular CacheChunk. */ private static class ProcCacheChunk extends CacheChunk { - public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed, + public ProcCacheChunk(long cbStartOffset, long cbEndOffset, boolean isCompressed, ByteBuffer originalData, MemoryBuffer targetBuffer, int originalCbIndex) { - super.init(targetBuffer, cbStartOffset, cbEndOffset); + super(targetBuffer, cbStartOffset, cbEndOffset); this.isOriginalDataCompressed = isCompressed; this.originalData = originalData; this.originalCbIndex = originalCbIndex; } @Override - public void reset() { - super.reset(); - this.originalData = null; - } - - @Override public String toString() { return super.toString() + ", original is set " + (this.originalData != null) + ", buffer was replaced " + (originalCbIndex == -1); @@ -1177,8 +1165,7 @@ class EncodedReaderImpl implements EncodedReader { MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); - CacheChunk tcc = POOLS.tccPool.take(); - tcc.init(buffer, partOffset, candidateEnd); + CacheChunk tcc = new CacheChunk(buffer, partOffset, candidateEnd); copyAndReplaceUncompressedChunks(candidateCached, dest, tcc, false); return tcc; } @@ -1192,8 +1179,7 @@ class EncodedReaderImpl implements EncodedReader { MemoryBuffer buffer = singleAlloc[0]; cacheWrapper.reuseBuffer(buffer); ByteBuffer dest = buffer.getByteBufferRaw(); - CacheChunk tcc = POOLS.tccPool.take(); - tcc.init(buffer, bc.getOffset(), bc.getEnd()); + CacheChunk tcc = new CacheChunk(buffer, bc.getOffset(), bc.getEnd()); copyUncompressedChunk(bc.getChunk(), dest); bc.replaceSelfWith(tcc); return tcc; @@ -1238,17 +1224,6 @@ class EncodedReaderImpl implements EncodedReader { } } - public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) { - while (current != null) { - if (current instanceof ProcCacheChunk) { - POOLS.pccPool.offer((ProcCacheChunk)current); - } else if (current instanceof CacheChunk) { - POOLS.tccPool.offer((CacheChunk)current); - } - current = current.next; - } - } - private void ponderReleaseInitialRefcount( long unlockUntilCOffset, long streamStartOffset, CacheChunk cc) { // Don't release if the buffer contains any data beyond the acceptable boundary. @@ -1655,8 +1630,7 @@ class EncodedReaderImpl implements EncodedReader { // Add it to result in order we are processing. cacheBuffers.add(futureAlloc); // Add it to the list of work to decompress. - ProcCacheChunk cc = POOLS.pccPool.take(); - cc.init(cbStartOffset, cbEndOffset, !isUncompressed, + ProcCacheChunk cc = new ProcCacheChunk(cbStartOffset, cbEndOffset, !isUncompressed, fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1); toDecompress.add(cc); // Adjust the compression block position. @@ -1690,26 +1664,6 @@ class EncodedReaderImpl implements EncodedReader { private static Pools createPools(PoolFactory pf) { Pools pools = new Pools(); - pools.pccPool = pf.createPool(1024, new PoolObjectHelper<ProcCacheChunk>() { - @Override - public ProcCacheChunk create() { - return new ProcCacheChunk(); - } - @Override - public void resetBeforeOffer(ProcCacheChunk t) { - t.reset(); - } - }); - pools.tccPool = pf.createPool(1024, new PoolObjectHelper<CacheChunk>() { - @Override - public CacheChunk create() { - return new CacheChunk(); - } - @Override - public void resetBeforeOffer(CacheChunk t) { - t.reset(); - } - }); pools.ecbPool = pf.createEncodedColumnBatchPool(); pools.csdPool = pf.createColumnStreamDataPool(); return pools; @@ -1937,7 +1891,6 @@ class EncodedReaderImpl implements EncodedReader { LOG.error("Error during the cleanup after another error; ignoring", t); } } - releaseCacheChunksIntoObjectPool(toRead.next); }
