Author: sershe Date: Sat Jan 10 02:38:17 2015 New Revision: 1650717 URL: http://svn.apache.org/r1650717 Log: Preliminary patch for low-level cache, needs few more touches and LRFU policy would not be thread-safe
Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Removed: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/Allocator.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/JavaAllocator.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/FifoCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/LrfuCachePolicy.java Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Jan 10 02:38:17 2015 @@ -1969,8 +1969,10 @@ public class HiveConf extends Configurat "Updates tez job execution progress in-place in the terminal."), LLAP_ENABLED("hive.llap.enabled", true, ""), - LLAP_CACHE_SIZE("hive.llap.cache.size", 1024L * 1024 * 1024, ""), - LLAP_BUFFER_SIZE("hive.llap.buffer.size", 16 * 1024 * 1024, ""), + LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.minalloc", 128 * 1024, ""), + LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.minalloc", 16 * 1024 * 1024, ""), + LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.minalloc", 128L * 1024 * 1024, ""), + LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.minalloc", 1024L * 1024 * 1024, ""), LLAP_REQUEST_THREAD_COUNT("hive.llap.request.thread.count", 16, ""), LLAP_USE_LRFU("hive.llap.use.lrfu", true, ""), LLAP_LRFU_LAMBDA("hive.llap.lrfu.lambda", 0.01f, "") Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Sat Jan 10 02:38:17 2015 @@ -18,10 +18,11 @@ package org.apache.hadoop.hive.llap.io.api; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer; - public class EncodedColumn<BatchKey> { - public EncodedColumn(BatchKey batchKey, int columnIndex, LlapBuffer columnData) { + // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance + // generality, and ability to not copy data from underlying low-level cached buffers. + public static class ColumnBuffer {} + public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) { this.batchKey = batchKey; this.columnIndex = columnIndex; this.columnData = columnData; @@ -29,5 +30,5 @@ public class EncodedColumn<BatchKey> { public BatchKey batchKey; public int columnIndex; - public LlapBuffer columnData; + public ColumnBuffer columnData; } \ No newline at end of file Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (added) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.api.cache; + +import java.nio.ByteBuffer; + +public abstract class LlapMemoryBuffer { + protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) { + initialize(byteBuffer, offset, length); + } + public void initialize(ByteBuffer byteBuffer, int offset, int length) { + this.byteBuffer = byteBuffer; + this.offset = offset; + this.length = length; + } + public ByteBuffer byteBuffer; + public int offset; + public int length; + +} \ No newline at end of file Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (added) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.api.cache; + + +public interface LowLevelCache { + + /** + * Gets file data for particular offsets. Null entries mean no data. + */ + LlapMemoryBuffer[] getFileData(String fileName, long[] offsets); + + /** + * Puts file data into cache. + * @return null if all data was put; bitmask indicating which chunks were not put otherwise; + * the replacement chunks from cache are updated directly in the array. + */ + long[] putFileData(String file, long[] offsets, LlapMemoryBuffer[] chunks); + + /** + * Releases the buffer returned by getFileData or allocateMultiple. + */ + void releaseBuffer(LlapMemoryBuffer buffer); + + /** + * Allocate dest.length new blocks of size into dest. + */ + void allocateMultiple(LlapMemoryBuffer[] dest, int size); +} Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java Sat Jan 10 02:38:17 2015 @@ -18,10 +18,10 @@ package org.apache.hadoop.hive.llap.cache; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; /** Dummy interface for now, might be different. */ public interface Cache<CacheKey> { - public LlapBuffer cacheOrGet(CacheKey key, LlapBuffer value); - public LlapBuffer get(CacheKey key); + public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value); + public ColumnBuffer get(CacheKey key); } Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +interface EvictionListener { + void notifyEvicted(LlapCacheableBuffer buffer); +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; + +public final class LlapCacheableBuffer extends LlapMemoryBuffer { + public LlapCacheableBuffer(ByteBuffer byteBuffer, int offset, int length) { + super(byteBuffer, offset, length); + } + + public String toStringForCache() { + return "[" + Integer.toHexString(hashCode()) + " " + String.format("%1$.2f", priority) + " " + + lastUpdate + " " + (isLocked() ? "!" : ".") + "]"; + } + + private static final int EVICTED_REFCOUNT = -1; + private final AtomicInteger refCount = new AtomicInteger(0); + + // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object. + public double priority; + public long lastUpdate = -1; + public int indexInHeap = -1; + public boolean isLockedInHeap; // TODO#: this flag is invalid and not thread safe + + @Override + public int hashCode() { + if (this.byteBuffer == null) return 0; + return (System.identityHashCode(this.byteBuffer) * 37 + offset) * 37 + length; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof LlapCacheableBuffer)) return false; + LlapCacheableBuffer other = (LlapCacheableBuffer)obj; + // We only compare objects, and not contents of the ByteBuffer. + return byteBuffer == other.byteBuffer + && this.offset == other.offset && this.length == other.length; + } + + int lock() { + int oldRefCount = -1; + while (true) { + oldRefCount = refCount.get(); + if (oldRefCount == EVICTED_REFCOUNT) return -1; + assert oldRefCount >= 0; + if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break; + } + return oldRefCount; + } + + public boolean isLocked() { + // Best-effort check. We cannot do a good check against caller thread, since + // refCount could still be > 0 if someone else locked. This is used for asserts. + return refCount.get() > 0; + } + + public boolean isInvalid() { + return refCount.get() == EVICTED_REFCOUNT; + } + + int unlock() { + int newRefCount = refCount.decrementAndGet(); + if (newRefCount < 0) { + throw new AssertionError("Unexpected refCount " + newRefCount); + } + return newRefCount; + } + + @Override + public String toString() { + return "0x" + Integer.toHexString(hashCode()); + } + + /** + * @return Whether the we can invalidate; false if locked or already evicted. + */ + boolean invalidate() { + while (true) { + int value = refCount.get(); + if (value != 0) return false; + if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break; + } + if (DebugUtils.isTraceLockingEnabled()) { + LlapIoImpl.LOG.info("Invalidated " + this + " due to eviction"); + } + return true; + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.cache; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; +import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; + +public class LowLevelBuddyCache implements LowLevelCache, EvictionListener { + private final ArrayList<arena> arenas; + private AtomicInteger newEvictions = new AtomicInteger(0); + private final Thread cleanupThread; + private final ConcurrentHashMap<String, FileCache> cache = + new ConcurrentHashMap<String, FileCache>(); + private final LowLevelCachePolicy cachePolicy; + + // Config settings + private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas; + + private final int minAllocation, maxAllocation; + private final long maxSize, arenaSize; + + public LowLevelBuddyCache(Configuration conf) { + minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); + maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC); + arenaSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE); + maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE); + if (maxSize < arenaSize || arenaSize > maxAllocation || maxAllocation < minAllocation) { + throw new AssertionError("Inconsistent sizes of cache, arena and allocations: " + + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize); + } + if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1) + || (Long.bitCount(arenaSize) != 1) || (minAllocation == 1)) { + // TODO: technically, arena size is not required to be so; needs to be divisible by maxAlloc + throw new AssertionError("Allocation and arena sizes must be powers of two > 1: " + + minAllocation + ", " + maxAllocation + ", " + arenaSize); + } + if ((maxSize % arenaSize) > 0 || (maxSize / arenaSize) > Integer.MAX_VALUE) { + throw new AssertionError( + "Cache size not consistent with arena size: " + arenaSize + "," + maxSize); + } + minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation); + maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation); + arenaSizeLog2 = 31 - Long.numberOfLeadingZeros(arenaSize); + maxArenas = (int)(maxSize / arenaSize); + arenas = new ArrayList<arena>(maxArenas); + for (int i = 0; i < maxArenas; ++i) { + arenas.add(new arena()); + } + arenas.get(0).init(); + cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU) + ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this) + : new LowLevelFifoCachePolicy(minAllocation, maxSize, this); + cleanupThread = new CleanupThread(); + cleanupThread.start(); + } + + // TODO: would it make sense to return buffers asynchronously? + @Override + public void allocateMultiple(LlapMemoryBuffer[] dest, int size) { + assert size > 0; + int freeListIndex = 31 - Integer.numberOfLeadingZeros(size); + if (size != (1 << freeListIndex)) ++freeListIndex; // not a power of two, add one more + freeListIndex = Math.max(freeListIndex - minAllocLog2, 0); + int allocationSize = 1 << (freeListIndex + minAllocLog2); + int total = dest.length * allocationSize; + cachePolicy.reserveMemory(total); + + int ix = 0; + for (int i = 0; i < dest.length; ++i) { + if (dest[i] != null) continue; + dest[i] = new LlapCacheableBuffer(null, -1, -1); // TODO: pool of objects? + } + // TODO: instead of waiting, loop only ones we haven't tried w/tryLock? + for (arena block : arenas) { + int newIx = allocateFast(block, freeListIndex, dest, ix, allocationSize); + if (newIx == -1) break; + if (newIx == dest.length) return; + ix = newIx; + } + // Then try to split bigger blocks. + for (arena block : arenas) { + int newIx = allocateWithSplit(block, freeListIndex, dest, ix, allocationSize); + if (newIx == -1) break; + if (newIx == dest.length) return; + ix = newIx; + } + // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare. + for (arena block : arenas) { + ix = allocateWithExpand(block, freeListIndex, dest, ix, allocationSize); + if (ix == dest.length) return; + } + } + + private int allocateFast(arena block, + int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) { + if (block.data == null) return -1; // not allocated yet + FreeList freeList = block.freeLists[freeListIndex]; + freeList.lock.lock(); + try { + ix = allocateFromFreeListUnderLock(block, freeList, freeListIndex, dest, ix, size); + } finally { + freeList.lock.unlock(); + } + return ix; + } + + private int allocateWithSplit( + arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) { + if (arena.data == null) return -1; // not allocated yet + FreeList freeList = arena.freeLists[freeListIndex]; + int remaining = -1; + freeList.lock.lock(); + try { + ix = allocateFromFreeListUnderLock(arena, freeList, freeListIndex, dest, ix, allocationSize); + remaining = dest.length - ix; + if (remaining == 0) return ix; + } finally { + freeList.lock.unlock(); + } + int splitListIndex = freeListIndex; + byte headerData = (byte)((freeListIndex << 1) | 1); + while (remaining > 0) { + ++splitListIndex; + int splitWays = 1 << (splitListIndex - freeListIndex); + int headerStep = 1 << splitListIndex; + int lastSplitBlocksRemaining = -1, lastSplitNextHeader = -1; + FreeList splitList = arena.freeLists[splitListIndex]; + splitList.lock.lock(); + try { + int headerIx = splitList.listHead; + while (headerIx >= 0 && remaining > 0) { + int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset; + int toTake = Math.min(splitWays, remaining); + remaining -= toTake; + lastSplitBlocksRemaining = splitWays - toTake; + for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) { + arena.headers[headerIx] = headerData; + dest[ix].initialize(arena.data, offset, allocationSize); + } + lastSplitNextHeader = headerIx; + headerIx = arena.data.getInt(origOffset + 4); + arena.data.putLong(origOffset, -1); // overwrite list pointers for safety + } + splitList.listHead = headerIx; + } finally { + splitList.lock.unlock(); + } + if (remaining == 0) { + // We have just obtained all we needed by splitting at lastSplitBlockOffset; now + // we need to put the space remaining from that block into lower free lists. + // TODO: if we could return blocks asynchronously, we could do this + int newListIndex = freeListIndex; + while (lastSplitBlocksRemaining > 0) { + if ((lastSplitBlocksRemaining & 1) == 1) { + arena.headers[lastSplitNextHeader] = (byte)((newListIndex << 1) | 1); + int offset = offsetFromHeaderIndex(lastSplitNextHeader); + FreeList newFreeList = arena.freeLists[newListIndex]; + newFreeList.lock.lock(); + try { + arena.data.putInt(offset, -1); + arena.data.putInt(offset, newFreeList.listHead); + newFreeList.listHead = lastSplitNextHeader; + } finally { + newFreeList.lock.unlock(); + } + lastSplitNextHeader += (1 << newListIndex); + } + lastSplitBlocksRemaining >>>= 1; + ++newListIndex; + continue; + } + } + } + return ix; + } + + public int offsetFromHeaderIndex(int lastSplitNextHeader) { + return lastSplitNextHeader << minAllocLog2; + } + + public int allocateFromFreeListUnderLock(arena block, FreeList freeList, + int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) { + int current = freeList.listHead; + while (current >= 0 && ix < dest.length) { + int offset = offsetFromHeaderIndex(current); + block.headers[current] = (byte)((freeListIndex << 1) | 1); + current = block.data.getInt(offset + 4); + dest[ix].initialize(block.data, offset, size); + block.data.putLong(offset, -1); // overwrite list pointers for safety + ++ix; + } + freeList.listHead = current; + return ix; + } + + private int allocateWithExpand( + arena block, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) { + if (block.data != null) return ix; // already allocated + synchronized (block) { + // Never goes from non-null to null, so this is the only place we need sync. + if (block.data == null) { + block.init(); + } + } + return allocateWithSplit(block, freeListIndex, dest, ix, size); + } + + @Override + public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) { + LlapMemoryBuffer[] result = null; + // TODO: string must be internalized + FileCache subCache = cache.get(fileName); + if (subCache == null || !subCache.incRef()) return result; + try { + for (int i = 0; i < offsets.length; ++i) { + while (true) { // Overwhelmingly only runs once. + long offset = offsets[i]; + LlapCacheableBuffer buffer = subCache.cache.get(offset); + if (buffer == null) break; + if (lockBuffer(buffer)) { + if (result == null) { + result = new LlapCacheableBuffer[offsets.length]; + } + result[i] = buffer; + break; + } + if (subCache.cache.remove(offset, buffer)) break; + } + } + } finally { + subCache.decRef(); + } + return result; + } + + private boolean lockBuffer(LlapCacheableBuffer buffer) { + int rc = buffer.lock(); + if (rc == 0) { + cachePolicy.notifyLock(buffer); + } + return rc >= 0; + } + + @Override + public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) { + long[] result = null; + assert buffers.length == offsets.length; + // TODO: string must be internalized + FileCache subCache = getOrAddFileSubCache(fileName); + try { + for (int i = 0; i < offsets.length; ++i) { + LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i]; + long offset = offsets[i]; + assert buffer.isLocked(); + while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). + LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer); + if (oldVal == null) break; // Cached successfully. + if (DebugUtils.isTraceCachingEnabled()) { + LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for " + + fileName + "@" + offset + "; old " + oldVal + ", new " + buffer); + } + if (lockBuffer(oldVal)) { + // We found an old, valid block for this key in the cache. + releaseBufferInternal(buffer); + buffers[i] = oldVal; + if (result == null) { + result = new long[align64(buffers.length) >>> 6]; + } + result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value + break; + } + // We found some old value but couldn't lock it; remove it. + subCache.cache.remove(offset, oldVal); + } + } + } finally { + subCache.decRef(); + } + return result; + } + + /** + * All this mess is necessary because we want to be able to remove sub-caches for fully + * evicted files. It may actually be better to have non-nested map with object keys? + */ + public FileCache getOrAddFileSubCache(String fileName) { + FileCache newSubCache = null; + while (true) { // Overwhelmingly executes once. + FileCache subCache = cache.get(fileName); + if (subCache != null) { + if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it. + if (newSubCache == null) { + newSubCache = new FileCache(); + newSubCache.incRef(); + } + // Found a stale value we cannot incRef; try to replace it with new value. + if (cache.replace(fileName, subCache, newSubCache)) return newSubCache; + continue; // Someone else replaced/removed a stale value, try again. + } + // No value found. + if (newSubCache == null) { + newSubCache = new FileCache(); + newSubCache.incRef(); + } + FileCache oldSubCache = cache.putIfAbsent(fileName, newSubCache); + if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache. + if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel. + // Someone created one in parallel and then it went stale. + if (cache.replace(fileName, oldSubCache, newSubCache)) return newSubCache; + // Someone else replaced/removed a parallel-added stale value, try again. Max confusion. + } + } + + private static int align64(int number) { + return ((number + 63) & ~63); + } + + + @Override + public void releaseBuffer(LlapMemoryBuffer buffer) { + releaseBufferInternal((LlapCacheableBuffer)buffer); + } + + public void releaseBufferInternal(LlapCacheableBuffer buffer) { + if (buffer.unlock() == 0) { + cachePolicy.notifyUnlock(buffer); + unblockEviction(); + } + } + + public static LlapCacheableBuffer allocateFake() { + return new LlapCacheableBuffer(null, -1, -1); + } + + public void unblockEviction() { + newEvictions.incrementAndGet(); + } + + @Override + public void notifyEvicted(LlapCacheableBuffer buffer) { + + } + + private final class CleanupThread extends Thread { + private int APPROX_CLEANUP_INTERVAL_SEC = 600; + + public CleanupThread() { + super("Llap ChunkPool cleanup thread"); + setDaemon(true); + setPriority(1); + } + + @Override + public void run() { + while (true) { + try { + doOneCleanupRound(); + } catch (InterruptedException ex) { + LlapIoImpl.LOG.warn("Cleanup thread has been interrupted"); + Thread.currentThread().interrupt(); + break; + } catch (Throwable t) { + LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t); + break; + } + } + } + + private void doOneCleanupRound() throws InterruptedException { + while (true) { + int evictionsSinceLast = newEvictions.getAndSet(0); + if (evictionsSinceLast > 0) break; + synchronized (newEvictions) { + newEvictions.wait(10000); + } + } + // Duration is an estimate; if the size of the map changes, it can be very different. + long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L; + int leftToCheck = 0; // approximate + for (FileCache fc : cache.values()) { + leftToCheck += fc.cache.size(); + } + // TODO: if these super-long-lived iterator affects the map in some bad way, + // we'd need to sleep once per round instead. + // Iterate thru all the filecaches. This is best-effort. + Iterator<Map.Entry<String, FileCache>> iter = cache.entrySet().iterator(); + while (iter.hasNext()) { + FileCache fc = iter.next().getValue(); + if (!fc.incRef()) { + throw new AssertionError("Something other than cleanup is removing elements from map"); + } + // Iterate thru the file cache. This is best-effort. + Iterator<Map.Entry<Long, LlapCacheableBuffer>> subIter = fc.cache.entrySet().iterator(); + boolean isEmpty = true; + while (subIter.hasNext()) { + Thread.sleep((leftToCheck <= 0) + ? 1 : (endTime - System.nanoTime()) / (1000000L * leftToCheck)); + if (subIter.next().getValue().isInvalid()) { + subIter.remove(); + } else { + isEmpty = false; + } + --leftToCheck; + } + if (!isEmpty) { + fc.decRef(); + continue; + } + // FileCache might be empty; see if we can remove it. "tryWriteLock" + if (!fc.startEvicting()) continue; + if (fc.cache.isEmpty()) { + fc.commitEvicting(); + iter.remove(); + } else { + fc.abortEvicting(); + } + } + } + } + + private class arena { + void init() { + data = ByteBuffer.allocateDirect(maxAllocation); + int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2); + headers = new byte[maxMinAllocs]; + int allocLog2Diff = maxAllocLog2 - minAllocLog2; + freeLists = new FreeList[allocLog2Diff]; + for (int i = 0; i < maxAllocLog2; ++i) { + freeLists[i] = new FreeList(); + } + int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2), + headerIndex = 0, headerIncrement = 1 << allocLog2Diff; + freeLists[maxAllocLog2 - 1].listHead = 0; + for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) { + // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity? + headers[headerIndex] = (byte)(allocLog2Diff << 1); // Maximum allocation size + data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerIncrement)); + data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerIncrement)); + headerIndex += headerIncrement; + } + } + ByteBuffer data; + // Avoid storing headers with data since we expect binary size allocations. + // Each headers[i] is a "virtual" byte at i * minAllocation. + byte[] headers; + FreeList[] freeLists; + } + + private static class FreeList { + ReentrantLock lock = new ReentrantLock(false); + int listHead = -1; // Index of where the buffer is; in minAllocation units + // TODO: One possible improvement - store blocks arriving left over from splits, and + // blocks requested, to be able to wait for pending splits and reduce fragmentation. + // However, we are trying to increase fragmentation now, since we cater to single-size. + } + + // TODO##: separate the classes? + private static class FileCache { + private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2; + // TODO: given the specific data, perhaps the nested thing should not be CHM + private ConcurrentHashMap<Long, LlapCacheableBuffer> cache + = new ConcurrentHashMap<Long, LlapCacheableBuffer>(); + private AtomicInteger refCount = new AtomicInteger(0); + + boolean incRef() { + while (true) { + int value = refCount.get(); + if (value == EVICTED_REFCOUNT) return false; + if (value == EVICTING_REFCOUNT) continue; // spin until it resolves + assert value >= 0; + if (refCount.compareAndSet(value, value + 1)) return true; + } + } + + void decRef() { + int value = refCount.decrementAndGet(); + if (value < 0) { + throw new AssertionError("Unexpected refCount " + value); + } + } + + boolean startEvicting() { + while (true) { + int value = refCount.get(); + if (value != 1) return false; + if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true; + } + } + + void commitEvicting() { + boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT); + assert result; + } + + void abortEvicting() { + boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0); + assert result; + } + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +public interface LowLevelCachePolicy { + void cache(LlapCacheableBuffer buffer); + void notifyLock(LlapCacheableBuffer buffer); + void notifyUnlock(LlapCacheableBuffer buffer); + void reserveMemory(long total); +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import java.util.concurrent.atomic.AtomicLong; + +public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy { + private final AtomicLong usedMemory; + private final long maxSize; + private EvictionListener evictionListener; + + public LowLevelCachePolicyBase(long maxSize, EvictionListener listener) { + this.maxSize = maxSize; + this.usedMemory = new AtomicLong(0); + this.evictionListener = listener; + } + + @Override + public void reserveMemory(long memoryToReserve) { + // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point? + while (memoryToReserve > 0) { + long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve; + if (newUsedMem <= maxSize) { + if (usedMemory.compareAndSet(usedMem, newUsedMem)) break; + continue; + } + // TODO: for one-block case, we could move notification for the last block out of the loop. + long evicted = evictSomeBlocks(memoryToReserve, evictionListener); + // Adjust the memory - we have to account for what we have just evicted. + while (true) { + long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted); + if (usedMemory.compareAndSet(usedMem, usedMem + reserveWithEviction)) { + memoryToReserve -= reserveWithEviction; + break; + } + usedMem = usedMemory.get(); + } + } + } + + protected abstract long evictSomeBlocks(long memoryToReserve, EvictionListener listener); +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import java.util.Iterator; +import java.util.LinkedHashSet; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class LowLevelFifoCachePolicy extends LowLevelCachePolicyBase { + private final Lock lock = new ReentrantLock(); + private final LinkedHashSet<LlapCacheableBuffer> buffers; + + public LowLevelFifoCachePolicy( + int expectedBufferSize, long maxCacheSize, EvictionListener listener) { + super(maxCacheSize, listener); + int expectedBuffers = (int)Math.ceil((maxCacheSize * 1.0) / expectedBufferSize); + buffers = new LinkedHashSet<LlapCacheableBuffer>((int)(expectedBuffers / 0.75f)); + } + + @Override + public void cache(LlapCacheableBuffer buffer) { + lock.lock(); + try { + buffers.add(buffer); + } finally { + lock.unlock(); + } + } + + @Override + public void notifyLock(LlapCacheableBuffer buffer) { + // FIFO policy doesn't care. + } + + @Override + public void notifyUnlock(LlapCacheableBuffer buffer) { + // FIFO policy doesn't care. + } + + @Override + protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) { + long evicted = 0; + lock.lock(); + try { + Iterator<LlapCacheableBuffer> iter = buffers.iterator(); + while (evicted < memoryToReserve && iter.hasNext()) { + LlapCacheableBuffer candidate = iter.next(); + if (candidate.invalidate()) { + iter.remove(); + evicted += candidate.length; + listener.notifyEvicted(candidate); + } + } + } finally { + lock.unlock(); + } + return evicted; + } +} Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1650717&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Sat Jan 10 02:38:17 2015 @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cache; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Implementation of the "simple" algorithm from "On the Existence of a Spectrum of Policies + * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies". + * TODO: fix this, no longer true; with ORC as is, 4k buffers per gig of cache + * We expect the number of buffers to be relatively small (1000s), so we just use one heap. + **/ +public class LowLevelLrfuCachePolicy extends LowLevelCachePolicyBase { + private final double lambda; + private final double f(long x) { + return Math.pow(0.5, lambda * x); + } + private static final double F0 = 1; // f(0) is always 1 + private final double touchPriority(long time, long lastAccess, double previous) { + return F0 + f(time - lastAccess) * previous; + } + private final double expirePriority(long time, long lastAccess, double previous) { + return f(time - lastAccess) * previous; + } + + private final AtomicLong timer = new AtomicLong(0); + /** + * The heap. Currently synchronized on itself; there is a number of papers out there + * with various lock-free/efficient priority queues which we can use if needed. + */ + private final LlapCacheableBuffer[] heap; + /** Number of elements. */ + private int heapSize = 0; + + public LowLevelLrfuCachePolicy(Configuration conf, + long minBufferSize, long maxCacheSize, EvictionListener listener) { + super(maxCacheSize, listener); + heap = new LlapCacheableBuffer[(int)Math.ceil((maxCacheSize * 1.0) / minBufferSize)]; + lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA); + } + + @Override + public void cache(LlapCacheableBuffer buffer) { + buffer.lastUpdate = timer.incrementAndGet(); + buffer.priority = F0; + assert buffer.isLocked(); + buffer.isLockedInHeap = true; + synchronized (heap) { + // Ensured by reserveMemory. + assert heapSize < heap.length : heap.length + " >= " + heapSize; + buffer.indexInHeap = heapSize; + heapifyUpUnderLock(buffer, buffer.lastUpdate); + if (DebugUtils.isTraceEnabled()) { + LlapIoImpl.LOG.info(buffer + " inserted at " + buffer.lastUpdate); + } + ++heapSize; + } + } + + @Override + public void notifyLock(LlapCacheableBuffer buffer) { + long time = timer.get(); + synchronized (heap) { + buffer.isLockedInHeap = true; + heapifyDownUnderLock(buffer, time); + } + } + + @Override + public void notifyUnlock(LlapCacheableBuffer buffer) { + long time = timer.incrementAndGet(); + synchronized (heap) { + if (DebugUtils.isTraceCachingEnabled()) { + LlapIoImpl.LOG.info("Touching " + buffer + " at " + time); + } + buffer.priority = touchPriority(time, buffer.lastUpdate, buffer.priority); + buffer.lastUpdate = time; + buffer.isLockedInHeap = false; + // Buffer's priority just decreased from boosted lock priority, so move up. + heapifyUpUnderLock(buffer, time); + } + } + + private LlapCacheableBuffer evictFromHeapUnderLock(long time) { + if (heapSize == 0) return null; + LlapCacheableBuffer result = heap[0]; + if (!result.invalidate()) { + // We boost the priority of locked buffers to a very large value; + // this means entire heap is locked. TODO: need to work around that for small pools? + if (DebugUtils.isTraceCachingEnabled()) { + LlapIoImpl.LOG.info("Failed to invalidate head " + result.toString() + "; size = " + heapSize); + } + return null; + } + if (DebugUtils.isTraceCachingEnabled()) { + LlapIoImpl.LOG.info("Evicting " + result + " at " + time); + } + result.indexInHeap = -1; + --heapSize; + LlapCacheableBuffer newRoot = heap[heapSize]; + newRoot.indexInHeap = 0; + if (newRoot.lastUpdate != time && !newRoot.isLockedInHeap) { + newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority); + newRoot.lastUpdate = time; + } + heapifyDownUnderLock(newRoot, time); + return result; + } + + private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) { + // Relative positions of the blocks don't change over time; priorities we expire can only + // decrease; we only have one block that could have broken heap rule and we always move it + // down; therefore, we can update priorities of other blocks as we go for part of the heap - + // we correct any discrepancy w/the parent after expiring priority, and any block we expire + // the priority for already has lower priority than that of its children. + // TODO: avoid expiring priorities if times are close? might be needlessly expensive. + int ix = buffer.indexInHeap; + double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority; + while (true) { + int leftIx = (ix << 1) + 1, rightIx = leftIx + 1; + if (leftIx >= heapSize) break; // Buffer is at the leaf node. + LlapCacheableBuffer left = heap[leftIx], right = null; + if (rightIx < heapSize) { + right = heap[rightIx]; + } + double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time); + if (priority <= leftPri && priority <= rightPri) break; + if (leftPri <= rightPri) { // prefer left, cause right might be missing + heap[ix] = left; + left.indexInHeap = ix; + ix = leftIx; + } else { + heap[ix] = right; + right.indexInHeap = ix; + ix = rightIx; + } + } + buffer.indexInHeap = ix; + heap[ix] = buffer; + } + + private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) { + // See heapifyDown comment. + int ix = buffer.indexInHeap; + double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority; + while (true) { + if (ix == 0) break; // Buffer is at the top of the heap. + int parentIx = (ix - 1) >>> 1; + LlapCacheableBuffer parent = heap[parentIx]; + double parentPri = getHeapifyPriority(parent, time); + if (priority >= parentPri) break; + heap[ix] = parent; + parent.indexInHeap = ix; + ix = parentIx; + } + buffer.indexInHeap = ix; + heap[ix] = buffer; + } + + private double getHeapifyPriority(LlapCacheableBuffer buf, long time) { + if (buf == null || buf.isLockedInHeap) return Double.MAX_VALUE; + if (buf.lastUpdate != time) { + buf.priority = expirePriority(time, buf.lastUpdate, buf.priority); + buf.lastUpdate = time; + } + return buf.priority; + } + + public String debugDumpHeap() { + if (heapSize == 0) return "<empty>"; + int levels = 32 - Integer.numberOfLeadingZeros(heapSize); + StringBuilder result = new StringBuilder(); + int ix = 0; + int spacesCount = heap[0].toStringForCache().length() + 3; + String full = StringUtils.repeat(" ", spacesCount), + half = StringUtils.repeat(" ", spacesCount / 2); + int maxWidth = 1 << (levels - 1); + for (int i = 0; i < levels; ++i) { + int width = 1 << i; + int middleGap = (maxWidth - width) / width; + for (int j = 0; j < (middleGap >>> 1); ++j) { + result.append(full); + } + if ((middleGap & 1) == 1) { + result.append(half); + } + for (int j = 0; j < width && ix < heapSize; ++j, ++ix) { + if (j != 0) { + for (int k = 0; k < middleGap; ++k) { + result.append(full); + } + if (middleGap == 0) { + result.append(" "); + } + } + if ((j & 1) == 0) { + result.append("("); + } + result.append(heap[ix].toStringForCache()); + if ((j & 1) == 1) { + result.append(")"); + } + } + result.append("\n"); + } + return result.toString(); + } + + @VisibleForTesting + public LlapCacheableBuffer evictOneMoreBlock() { + synchronized (heap) { + return evictFromHeapUnderLock(timer.get()); + } + } + + @Override + protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) { + long evicted = 0; + while (evicted < memoryToReserve) { + LlapCacheableBuffer buffer = evictOneMoreBlock(); + if (buffer == null) return evicted; + evicted += buffer.length; + listener.notifyEvicted(buffer); + } + return evicted; + } +} Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java Sat Jan 10 02:38:17 2015 @@ -18,16 +18,16 @@ package org.apache.hadoop.hive.llap.cache; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; public class NoopCache<CacheKey> implements Cache<CacheKey> { @Override - public LlapBuffer cacheOrGet(CacheKey key, LlapBuffer value) { + public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value) { return value; } @Override - public LlapBuffer get(CacheKey key) { + public ColumnBuffer get(CacheKey key) { return null; // TODO: ensure real implementation increases refcount } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java Sat Jan 10 02:38:17 2015 @@ -22,7 +22,7 @@ package org.apache.hadoop.hive.llap.io.a import java.util.List; import java.io.IOException; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; public interface VectorReader { @@ -33,7 +33,7 @@ public interface VectorReader { public static class ColumnVectorBatch { public ColumnVector[] cols; public int size; - public List<LlapBuffer> lockedBuffers; + public List<ColumnBuffer> lockedBuffers; } public ColumnVectorBatch next() throws InterruptedException, IOException; public void close() throws IOException; Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Sat Jan 10 02:38:17 2015 @@ -26,11 +26,10 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.cache.Cache; -import org.apache.hadoop.hive.llap.cache.JavaAllocator; +import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache; import org.apache.hadoop.hive.llap.cache.NoopCache; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.VectorReader; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator; import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey; import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer; @@ -53,11 +52,8 @@ public class LlapIoImpl implements LlapI private LlapIoImpl(Configuration conf) throws IOException { this.conf = conf; - // ChunkPool<OrcLoader.ChunkKey> chunkPool = new ChunkPool<OrcLoader.ChunkKey>(); - // new BufferPool(conf, chunkPool) - Allocator allocator = new JavaAllocator(); - Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>(); - this.edp = new OrcEncodedDataProducer(allocator, cache, conf); + Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>(); // High-level cache not supported yet. + this.edp = new OrcEncodedDataProducer(new LowLevelBuddyCache(conf), cache, conf); this.cvp = new OrcColumnVectorProducer(edp, conf); } @@ -66,7 +62,7 @@ public class LlapIoImpl implements LlapI getOrCreateInstance(conf); } - // TODO#: Add "create" method in a well-defined place when server is started + // TODO: Add "create" method in a well-defined place when server is started public static LlapIo getOrCreateInstance(Configuration conf) { if (ioImpl != null) return ioImpl; synchronized (instanceLock) { Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Sat Jan 10 02:38:17 2015 @@ -26,9 +26,8 @@ import java.util.List; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; import org.apache.hadoop.hive.llap.io.api.VectorReader.ColumnVectorBatch; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -38,10 +37,10 @@ import org.apache.hadoop.mapred.InputSpl public abstract class ColumnVectorProducer<BatchKey> { static class EncodedColumnBatch { public EncodedColumnBatch(int colCount) { - columnDatas = new LlapBuffer[colCount]; + columnDatas = new ColumnBuffer[colCount]; columnsRemaining = colCount; } - public LlapBuffer[] columnDatas; + public ColumnBuffer[] columnDatas; public int columnsRemaining; } @@ -51,7 +50,7 @@ public abstract class ColumnVectorProduc // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb. private final HashMap<BatchKey, EncodedColumnBatch> pendingData = new HashMap<BatchKey, EncodedColumnBatch>(); - private ConsumerFeedback<LlapBuffer> upstreamFeedback; + private ConsumerFeedback<ColumnBuffer> upstreamFeedback; private final Consumer<ColumnVectorBatch> downstreamConsumer; private final int colCount; @@ -60,7 +59,7 @@ public abstract class ColumnVectorProduc this.colCount = colCount; } - public void init(ConsumerFeedback<LlapBuffer> upstreamFeedback) { + public void init(ConsumerFeedback<ColumnBuffer> upstreamFeedback) { this.upstreamFeedback = upstreamFeedback; } @@ -134,13 +133,14 @@ public abstract class ColumnVectorProduc @Override public void returnData(ColumnVectorBatch data) { - for (LlapBuffer lockedBuffer : data.lockedBuffers) { + // TODO#: this should happen earlier, when data is decoded buffers are not needed + for (ColumnBuffer lockedBuffer : data.lockedBuffers) { upstreamFeedback.returnData(lockedBuffer); } } private void dicardPendingData(boolean isStopped) { - List<LlapBuffer> dataToDiscard = new ArrayList<LlapBuffer>(pendingData.size() * colCount); + List<ColumnBuffer> dataToDiscard = new ArrayList<ColumnBuffer>(pendingData.size() * colCount); List<EncodedColumnBatch> batches = new ArrayList<EncodedColumnBatch>(pendingData.size()); synchronized (pendingData) { if (isStopped) { @@ -151,13 +151,13 @@ public abstract class ColumnVectorProduc } for (EncodedColumnBatch batch : batches) { synchronized (batch) { - for (LlapBuffer b : batch.columnDatas) { + for (ColumnBuffer b : batch.columnDatas) { dataToDiscard.add(b); } batch.columnDatas = null; } } - for (LlapBuffer data : dataToDiscard) { + for (ColumnBuffer data : dataToDiscard) { upstreamFeedback.returnData(data); } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java Sat Jan 10 02:38:17 2015 @@ -21,8 +21,8 @@ package org.apache.hadoop.hive.llap.io.e import java.io.IOException; import org.apache.hadoop.hive.llap.ConsumerFeedback; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; -public interface EncodedDataReader<BatchKey> extends ConsumerFeedback<LlapBuffer> { +public interface EncodedDataReader<BatchKey> extends ConsumerFeedback<ColumnBuffer> { public void start() throws IOException; } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Sat Jan 10 02:38:17 2015 @@ -30,8 +30,8 @@ import org.apache.hadoop.hive.llap.Consu import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.cache.Cache; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; +import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey; @@ -48,9 +48,9 @@ import org.apache.hadoop.mapred.InputSpl public class OrcEncodedDataProducer implements EncodedDataProducer<OrcBatchKey> { private FileSystem cachedFs = null; + private final LowLevelCache lowLevelCache; private Configuration conf; private OrcMetadataCache metadataCache; - private final Allocator allocator; private final Cache<OrcCacheKey> cache; private class OrcEncodedDataReader implements EncodedDataReader<OrcBatchKey>, @@ -140,7 +140,7 @@ public class OrcEncodedDataProducer impl } RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes); // We pass in the already-filtered RGs, as well as sarg. ORC can apply additional filtering. - stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, allocator); + stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, lowLevelCache); stripeReader.close(); } @@ -151,7 +151,7 @@ public class OrcEncodedDataProducer impl } @Override - public void returnData(LlapBuffer data) { + public void returnData(ColumnBuffer data) { // TODO#: return the data to cache (unlock) } @@ -231,7 +231,7 @@ public class OrcEncodedDataProducer impl boolean areAllRgsInCache = true; for (int rgIx = 0; rgIx < rgCount; ++rgIx) { key.rgIx = rgIx; - LlapBuffer cached = cache.get(key); + ColumnBuffer cached = cache.get(key); if (cached == null) { areAllRgsInCache = false; continue; @@ -274,9 +274,9 @@ public class OrcEncodedDataProducer impl public void consumeData(EncodedColumn<OrcBatchKey> data) { // Store object in cache; create new key object - cannot be reused. OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex); - LlapBuffer cached = cache.cacheOrGet(key, data.columnData); + ColumnBuffer cached = cache.cacheOrGet(key, data.columnData); if (data.columnData != cached) { - allocator.deallocate(data.columnData); + // TODO: deallocate columnData data.columnData = cached; } consumer.consumeData(data); @@ -301,15 +301,15 @@ public class OrcEncodedDataProducer impl } private static int align64(int number) { - int rem = number & 63; - return number - rem + (rem == 0 ? 0 : 64); + return ((number + 63) & ~63); } - public OrcEncodedDataProducer(Allocator allocator, Cache<OrcCacheKey> cache, Configuration conf) throws IOException { + public OrcEncodedDataProducer(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache, + Configuration conf) throws IOException { // We assume all splits will come from the same FS. this.cachedFs = FileSystem.get(conf); this.cache = cache; - this.allocator = allocator; + this.lowLevelCache = lowLevelCache; this.conf = conf; this.metadataCache = null; } @@ -319,5 +319,4 @@ public class OrcEncodedDataProducer impl SearchArgument sarg, Consumer<EncodedColumn<OrcBatchKey>> consumer) { return new OrcEncodedDataReader(split, columnIds, sarg, consumer); } - } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Sat Jan 10 02:38:17 2015 @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator; +import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.orc.*; @@ -99,7 +99,7 @@ public class LLAPRecordReaderImpl extend @Override public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg, - Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator) { + Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) { } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java Sat Jan 10 02:38:17 2015 @@ -39,11 +39,9 @@ public class BufferPool { public BufferPool(Configuration conf) { - this.maxCacheSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE); - this.bufferSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE); - this.cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU) - ? new LrfuCachePolicy(conf, bufferSize, maxCacheSize) - : new FifoCachePolicy(bufferSize, maxCacheSize); + this.maxCacheSize = 0;// HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE); + this.bufferSize = 0; // HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE); + this.cachePolicy = null; } /** Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java Sat Jan 10 02:38:17 2015 @@ -54,11 +54,9 @@ public class ChunkPool<K> /*implements E * @return Chunk corresponding to k. */ public Chunk getChunk(K key, HashSet<WeakBuffer> lockedBuffers) { - Chunk result = chunkCache.get(key); - if (result == null) { - return null; - } while (true) { + Chunk result = chunkCache.get(key); + if (result == null) return null; if (lockChunk(result, lockedBuffers)) return result; if (chunkCache.remove(key, result)) return null; } Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java Sat Jan 10 02:38:17 2015 @@ -29,7 +29,7 @@ import org.junit.Assume; import org.junit.Test; import static org.junit.Assert.*; -public class TestLrfuCachePolicy { +public class TestLrfuCachePolicy {/* TODO: switch to LowLevel one private static final Log LOG = LogFactory.getLog(TestLrfuCachePolicy.class); @Test @@ -221,5 +221,5 @@ public class TestLrfuCachePolicy { debugStr += inserted.get(i); } return debugStr; - } + }*/ } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Sat Jan 10 02:38:17 2015 @@ -152,6 +152,7 @@ abstract class InStream extends InputStr currentRange = 0; } + // TODO: this should allocate from cache private ByteBuffer allocateBuffer(int size) { // TODO: use the same pool as the ORC readers if(isDirect == true) { Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Sat Jan 10 02:38:17 2015 @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.exec.ve import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator; +import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; @@ -97,5 +97,5 @@ public interface RecordReader { * @param allocator Allocator to allocate memory. */ void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg, - Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator); + Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache); } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Sat Jan 10 02:38:17 2015 @@ -45,7 +45,7 @@ import org.apache.hadoop.hive.common.typ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; -import org.apache.hadoop.hive.llap.io.api.cache.Allocator; +import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -2676,11 +2676,13 @@ public class RecordReaderImpl implements ) throws IOException { long start = stripe.getIndexLength(); long end = start + stripe.getDataLength(); + // TODO: planning should be added here too, to take cache into account // explicitly trigger 1 big read DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)}; bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges)); List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList(); createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams); + // TODO: decompressed data from streams should be put in cache } /** @@ -3050,6 +3052,7 @@ public class RecordReaderImpl implements private void readPartialDataStreams(StripeInformation stripe ) throws IOException { List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); + // TODO: planning should take cache into account List<DiskRange> chunks = planReadPartialDataStreams(streamList, indexes, included, includedRowGroups, codec != null, @@ -3062,8 +3065,8 @@ public class RecordReaderImpl implements LOG.debug("merge = " + stringifyDiskRanges(chunks)); } bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks); - createStreams(streamList, bufferChunks, included, codec, bufferSize, - streams); + // TODO: decompressed data from streams should be put in cache + createStreams(streamList, bufferChunks, included, codec, bufferSize, streams); } @Override @@ -3300,7 +3303,7 @@ public class RecordReaderImpl implements @Override public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg, - Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator) { + Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) { // TODO: HERE read encoded data }