Repository: asterixdb Updated Branches: refs/heads/master c04046c11 -> fbf3c0a97
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java index 83e377a..3a22793 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java @@ -19,15 +19,16 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.replication.IIOReplicationManager; @@ -38,42 +39,71 @@ import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper; import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; import org.apache.hyracks.storage.common.buffercache.VirtualPage; import org.apache.hyracks.storage.common.file.BufferedFileHandle; -import org.apache.hyracks.storage.common.file.IFileMapManager; import org.apache.hyracks.storage.common.file.FileMapManager; +import org.apache.hyracks.storage.common.file.IFileMapManager; import org.apache.hyracks.util.JSONUtil; public class VirtualBufferCache implements IVirtualBufferCache { - private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName()); - - private static final int OVERFLOW_PADDING = 8; + private static final Logger LOGGER = Logger.getLogger(VirtualBufferCache.class.getName()); private final ICacheMemoryAllocator allocator; private final IFileMapManager fileMapManager; private final int pageSize; - private final int numPages; - + private final int pageBudget; private final CacheBucket[] buckets; - private final ArrayList<VirtualPage> pages; - - private volatile int nextFree; + private final BlockingQueue<VirtualPage> freePages; private final AtomicInteger largePages; - + private final AtomicInteger used; private boolean open; - public VirtualBufferCache(ICacheMemoryAllocator allocator, int pageSize, int numPages) { + public VirtualBufferCache(ICacheMemoryAllocator allocator, int pageSize, int pageBudget) { this.allocator = allocator; this.fileMapManager = new FileMapManager(); this.pageSize = pageSize; - this.numPages = 2 * (numPages / 2) + 1; - - buckets = new CacheBucket[this.numPages]; - pages = new ArrayList<>(); - nextFree = 0; + if (pageBudget == 0) { + throw new IllegalArgumentException("Page Budget Cannot be 0"); + } + this.pageBudget = pageBudget; + buckets = new CacheBucket[this.pageBudget]; + freePages = new ArrayBlockingQueue<>(this.pageBudget); largePages = new AtomicInteger(0); + used = new AtomicInteger(0); open = false; } @Override + public int getPageSize() { + return pageSize; + } + + @Override + public int getPageSizeWithHeader() { + return pageSize; + } + + public int getLargePages() { + return largePages.get(); + } + + public int getUsage() { + return used.get(); + } + + public int getPreAllocatedPages() { + return freePages.size(); + } + + @Override + public int getPageBudget() { + return pageBudget; + } + + @Override + public boolean isFull() { + return used.get() >= pageBudget; + } + + @Override public int createFile(FileReference fileRef) throws HyracksDataException { synchronized (fileMapManager) { return fileMapManager.registerFile(fileRef); @@ -82,16 +112,28 @@ public class VirtualBufferCache implements IVirtualBufferCache { @Override public int openFile(FileReference fileRef) throws HyracksDataException { - synchronized (fileMapManager) { - if (fileMapManager.isMapped(fileRef)) { - return fileMapManager.lookupFileId(fileRef); + try { + synchronized (fileMapManager) { + if (fileMapManager.isMapped(fileRef)) { + return fileMapManager.lookupFileId(fileRef); + } + return fileMapManager.registerFile(fileRef); } - return fileMapManager.registerFile(fileRef); + } finally { + logStats(); + } + } + + private void logStats() { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Free (allocated) pages = " + freePages.size() + ". Budget = " + pageBudget + + ". Large pages = " + largePages.get() + ". Overall usage = " + used.get()); } } @Override public void openFile(int fileId) throws HyracksDataException { + logStats(); } @Override @@ -111,6 +153,7 @@ public class VirtualBufferCache implements IVirtualBufferCache { synchronized (fileMapManager) { fileMapManager.unregisterFile(fileId); } + int reclaimedPages = 0; for (int i = 0; i < buckets.length; i++) { final CacheBucket bucket = buckets[i]; bucket.bucketLock.lock(); @@ -119,16 +162,20 @@ public class VirtualBufferCache implements IVirtualBufferCache { VirtualPage curr = bucket.cachedPage; while (curr != null) { if (BufferedFileHandle.getFileId(curr.dpid()) == fileId) { - if (curr.getFrameSizeMultiplier() > 1) { + reclaimedPages++; + if (curr.isLargePage()) { largePages.getAndAdd(-curr.getFrameSizeMultiplier()); + used.addAndGet(-curr.getFrameSizeMultiplier()); + } else { + used.decrementAndGet(); } if (prev == null) { bucket.cachedPage = curr.next(); - curr.reset(); + recycle(curr); curr = bucket.cachedPage; } else { prev.next(curr.next()); - curr.reset(); + recycle(curr); curr = prev.next(); } } else { @@ -140,54 +187,27 @@ public class VirtualBufferCache implements IVirtualBufferCache { bucket.bucketLock.unlock(); } } - defragPageList(); - } - - private void defragPageList() { - synchronized (pages) { - int start = 0; - int end = nextFree - 1; - while (start < end) { - VirtualPage lastUsed = pages.get(end); - while (end > 0 && lastUsed.dpid() == -1) { - --end; - lastUsed = pages.get(end); - } - - if (end == 0) { - nextFree = lastUsed.dpid() == -1 ? 0 : 1; - break; - } - - VirtualPage firstUnused = pages.get(start); - while (start < end && firstUnused.dpid() != -1) { - ++start; - firstUnused = pages.get(start); - } - - if (start >= end) { - break; - } - - Collections.swap(pages, start, end); - nextFree = end; - --end; - ++start; - } + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Reclaimed pages = " + reclaimedPages); } + logStats(); } - @Override - public ICachedPage tryPin(long dpid) throws HyracksDataException { - return pin(dpid, false); + private void recycle(VirtualPage page) { + // recycle only if + // 1. not a large page + // 2. allocation is not above budget + if (used.get() < pageBudget && !page.isLargePage()) { + page.reset(); + freePages.offer(page); + } } @Override public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException { - VirtualPage page = null; + VirtualPage page; int hash = hash(dpid); CacheBucket bucket = buckets[hash]; - bucket.bucketLock.lock(); try { page = bucket.cachedPage; @@ -197,15 +217,15 @@ public class VirtualBufferCache implements IVirtualBufferCache { } page = page.next(); } - if (!newPage) { + int fileId = BufferedFileHandle.getFileId(dpid); + FileReference fileRef; synchronized (fileMapManager) { - throw new HyracksDataException( - "Page " + BufferedFileHandle.getPageId(dpid) + " does not exist in file " - + fileMapManager.lookupFileName(BufferedFileHandle.getFileId(dpid))); + fileRef = fileMapManager.lookupFileName(fileId); } + throw HyracksDataException.create(ErrorCode.PAGE_DOES_NOT_EXIST_IN_FILE, + BufferedFileHandle.getPageId(dpid), fileRef); } - page = getOrAllocPage(dpid); page.next(bucket.cachedPage); bucket.cachedPage = page; @@ -222,18 +242,13 @@ public class VirtualBufferCache implements IVirtualBufferCache { } private VirtualPage getOrAllocPage(long dpid) { - VirtualPage page; - synchronized (pages) { - if (nextFree >= pages.size()) { - page = new VirtualPage(allocator.allocate(pageSize, 1)[0], pageSize); - page.multiplier(1); - pages.add(page); - } else { - page = pages.get(nextFree); - } - ++nextFree; - page.dpid(dpid); + VirtualPage page = freePages.poll(); + if (page == null) { + page = new VirtualPage(allocator.allocate(pageSize, 1)[0], pageSize); + page.multiplier(1); } + page.dpid(dpid); + used.incrementAndGet(); return page; } @@ -245,10 +260,26 @@ public class VirtualBufferCache implements IVirtualBufferCache { // no-op return; } + // Maintain counters + // In addition, discard pre-allocated pages as the multiplier of the large page + // This is done before actual resizing in order to allow GC for the same budget out of + // the available free pages first if (origMultiplier == 1) { - synchronized (pages) { - pages.remove(cPage); - nextFree--; + largePages.getAndAdd(multiplier); + int diff = multiplier - 1; + used.getAndAdd(diff); + for (int i = 0; i < diff; i++) { + freePages.poll(); + } + } else if (multiplier == 1) { + largePages.getAndAdd(-origMultiplier); + used.addAndGet(-origMultiplier + 1); + } else { + int diff = multiplier - origMultiplier; + largePages.getAndAdd(diff); + used.getAndAdd(diff); + for (int i = 0; i < diff; i++) { + freePages.poll(); } } ByteBuffer newBuffer = allocator.allocate(pageSize * multiplier, 1)[0]; @@ -257,15 +288,6 @@ public class VirtualBufferCache implements IVirtualBufferCache { oldBuffer.limit(newBuffer.capacity()); } newBuffer.put(oldBuffer); - if (origMultiplier == 1) { - largePages.getAndAdd(multiplier); - } else if (multiplier == 1) { - largePages.getAndAdd(-origMultiplier); - pages.add(0, (VirtualPage) cPage); - nextFree++; - } else { - largePages.getAndAdd(multiplier - origMultiplier); - } ((VirtualPage) cPage).buffer(newBuffer); ((VirtualPage) cPage).multiplier(multiplier); } @@ -275,7 +297,8 @@ public class VirtualBufferCache implements IVirtualBufferCache { } @Override - public void flushDirtyPage(ICachedPage page) throws HyracksDataException { + public void flush(ICachedPage page) throws HyracksDataException { + throw new UnsupportedOperationException(); } @Override @@ -283,59 +306,50 @@ public class VirtualBufferCache implements IVirtualBufferCache { } @Override - public int getPageSize() { - return pageSize; - } - - @Override - public int getPageSizeWithHeader() { - return pageSize; - } - - @Override - public int getNumPages() { - return numPages; - } - - @Override public void open() throws HyracksDataException { if (open) { - throw new HyracksDataException("Failed to open virtual buffercache since it is already open."); + throw HyracksDataException.create(ErrorCode.VBC_ALREADY_OPEN); } - pages.trimToSize(); - pages.ensureCapacity(numPages + OVERFLOW_PADDING); - allocator.reserveAllocation(pageSize, numPages); - for (int i = 0; i < numPages; i++) { + allocator.reserveAllocation(pageSize, pageBudget); + for (int i = 0; i < pageBudget; i++) { buckets[i] = new CacheBucket(); } - nextFree = 0; largePages.set(0); + used.set(0); open = true; } @Override public void reset() { - for (int i = 0; i < numPages; i++) { - buckets[i].cachedPage = null; - } - int excess = pages.size() - numPages; - if (excess > 0) { - for (int i = numPages + excess - 1; i >= numPages; i--) { - pages.remove(i); + recycleAllPages(); + used.set(0); + largePages.set(0); + } + + private void recycleAllPages() { + for (int i = 0; i < buckets.length; i++) { + final CacheBucket bucket = buckets[i]; + bucket.bucketLock.lock(); + try { + VirtualPage curr = bucket.cachedPage; + while (curr != null) { + bucket.cachedPage = curr.next(); + recycle(curr); + curr = bucket.cachedPage; + } + } finally { + bucket.bucketLock.unlock(); } } - nextFree = 0; - largePages.set(0); } @Override public void close() throws HyracksDataException { if (!open) { - throw new HyracksDataException("Failed to close virtual buffercache since it is already closed."); + throw HyracksDataException.create(ErrorCode.VBC_ALREADY_CLOSED); } - - pages.clear(); - for (int i = 0; i < numPages; i++) { + freePages.clear(); + for (int i = 0; i < pageBudget; i++) { buckets[i].cachedPage = null; } open = false; @@ -343,11 +357,11 @@ public class VirtualBufferCache implements IVirtualBufferCache { public String dumpState() { StringBuilder sb = new StringBuilder(); - sb.append(String.format("Page size = %d\n", pageSize)); - sb.append(String.format("Capacity = %d\n", numPages)); - sb.append(String.format("Allocated pages = %d\n", pages.size())); - sb.append(String.format("Allocated large pages = %d\n", largePages.get())); - sb.append(String.format("Next free page = %d\n", nextFree)); + sb.append(String.format("Page size = %d%n", pageSize)); + sb.append(String.format("Page budget = %d%n", pageBudget)); + sb.append(String.format("Used pages = %d%n", used.get())); + sb.append(String.format("Used large pages = %d%n", largePages.get())); + sb.append(String.format("Available free pages = %d%n", freePages.size())); return sb.toString(); } @@ -356,11 +370,6 @@ public class VirtualBufferCache implements IVirtualBufferCache { return fileMapManager; } - @Override - public boolean isFull() { - return (nextFree + largePages.get()) >= numPages; - } - private static class CacheBucket { private final ReentrantLock bucketLock; private VirtualPage cachedPage; @@ -376,14 +385,6 @@ public class VirtualBufferCache implements IVirtualBufferCache { } @Override - public void adviseWontNeed(ICachedPage page) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.log(Level.INFO, "Calling adviseWontNeed on " + this.getClass().getName() - + " makes no sense as this BufferCache cannot evict pages"); - } - } - - @Override public void returnPage(ICachedPage page) { } @@ -410,11 +411,6 @@ public class VirtualBufferCache implements IVirtualBufferCache { } @Override - public void setPageDiskId(ICachedPage page, long dpid) { - - } - - @Override public void returnPage(ICachedPage page, boolean reinsert) { throw new UnsupportedOperationException("Virtual buffer caches don't have FIFO writers"); } @@ -449,7 +445,7 @@ public class VirtualBufferCache implements IVirtualBufferCache { map.put("class", getClass().getSimpleName()); map.put("allocator", allocator.toString()); map.put("pageSize", pageSize); - map.put("numPages", numPages); + map.put("pageBudget", pageBudget); map.put("open", open); return map; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java index cf40a7a..77dc751 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java @@ -147,7 +147,7 @@ public class InMemoryInvertedIndex implements IInPlaceInvertedIndex { @Override public long getMemoryAllocationSize() { IBufferCache virtualBufferCache = btree.getBufferCache(); - return virtualBufferCache.getNumPages() * virtualBufferCache.getPageSize(); + return (long) virtualBufferCache.getPageBudget() * virtualBufferCache.getPageSize(); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java index ee7c827..750a2fa 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java @@ -762,7 +762,7 @@ public class RTree extends AbstractTreeIndex { } public RTreeAccessor createAccessor(IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int[] nonIndexFields) { + ISearchOperationCallback searchCallback, int[] nonIndexFields) { return new RTreeAccessor(this, modificationCallback, searchCallback, nonIndexFields); } @@ -1008,7 +1008,7 @@ public class RTree extends AbstractTreeIndex { int finalPageId = freePageManager.takePage(metaFrame); n.pageId = finalPageId; - bufferCache.setPageDiskId(n.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId)); + n.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId)); //else we are looking at a leaf } //set next guide MBR @@ -1071,9 +1071,8 @@ public class RTree extends AbstractTreeIndex { } else { prevNodeFrontierPages.set(level, finalPageId); } - bufferCache.setPageDiskId(frontier.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId)); + frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId)); pagesToWrite.add(frontier.page); - lowerFrame = prevInteriorFrame; lowerFrame.setPage(frontier.page); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index fcea8e0..d0f4965 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -138,7 +138,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } @Override - public int getNumPages() { + public int getPageBudget() { return pageReplacementStrategy.getMaxAllowedNumPages(); } @@ -161,33 +161,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } @Override - public ICachedPage tryPin(long dpid) throws HyracksDataException { - // Calling the pinSanityCheck should be used only for debugging, since - // the synchronized block over the fileInfoMap is a hot spot. - if (DEBUG) { - pinSanityCheck(dpid); - } - CachedPage cPage = null; - int hash = hash(dpid); - CacheBucket bucket = pageMap[hash]; - bucket.bucketLock.lock(); - try { - cPage = bucket.cachedPage; - while (cPage != null) { - if (cPage.dpid == dpid) { - cPage.pinCount.incrementAndGet(); - pageReplacementStrategy.notifyCachePageAccess(cPage); - return cPage; - } - cPage = cPage.next; - } - } finally { - bucket.bucketLock.unlock(); - } - return cPage; - } - - @Override public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException { // Calling the pinSanityCheck should be used only for debugging, since // the synchronized block over the fileInfoMap is a hot spot. @@ -978,7 +951,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } @Override - public void flushDirtyPage(ICachedPage page) throws HyracksDataException { + public void flush(ICachedPage page) throws HyracksDataException { // Assumes the caller has pinned the page. cleanerThread.cleanPage((CachedPage) page, true); } @@ -1177,11 +1150,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } @Override - public void adviseWontNeed(ICachedPage page) { - pageReplacementStrategy.adviseWontNeed((ICachedPageInternal) page); - } - - @Override public ICachedPage confiscatePage(long dpid) throws HyracksDataException { return confiscatePage(dpid, 1); } @@ -1329,17 +1297,21 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { finishQueue(); if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) { cycleCount = 0; // suppress warning below - throw new HyracksDataException("Unable to find free page in buffer cache after " - + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)" - + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount) - + " successful pins since start of cycle" : "")); + throw new HyracksDataException( + "Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES + + " cycles (buffer cache undersized?)" + (DEBUG + ? " ; " + (masterPinCount.get() - startingPinCount) + + " successful pins since start of cycle" + : "")); } } } finally { if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isLoggable(Level.WARNING)) { LOGGER.warning("Took " + cycleCount + " cycles to find free page in buffer cache. (buffer cache " - + "undersized?)" + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount) - + " successful pins since start of cycle" : "")); + + "undersized?)" + (DEBUG + ? " ; " + (masterPinCount.get() - startingPinCount) + + " successful pins since start of cycle" + : "")); } } } @@ -1402,13 +1374,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } @Override - public void setPageDiskId(ICachedPage page, long dpid) { - ((CachedPage) page).dpid = dpid; - } - - @Override public IFIFOPageQueue createFIFOQueue() { - return fifoWriter.createQueue(FIFOLocalWriter.instance()); + return fifoWriter.createQueue(FIFOLocalWriter.INSTANCE); } @Override @@ -1430,10 +1397,6 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } @Override - /** - * _ONLY_ call this if you absolutely, positively know this file has no dirty pages in the cache! - * Bypasses the normal lifecycle of a file handle and evicts all references to it immediately. - */ public void purgeHandle(int fileId) throws HyracksDataException { synchronized (fileInfoMap) { BufferedFileHandle fh = fileInfoMap.get(fileId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java index 76bbd4c..bc0a04e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java @@ -46,7 +46,7 @@ public class CachedPage implements ICachedPageInternal { private final StackTraceElement[] ctorStack; //Constructor for making dummy entry for FIFO queue - public CachedPage(){ + public CachedPage() { this.cpid = -1; this.buffer = null; this.pageReplacementStrategy = null; @@ -55,7 +55,7 @@ public class CachedPage implements ICachedPageInternal { pinCount = null; queueInfo = null; replacementStrategyObject = null; - latch =null; + latch = null; ctorStack = DEBUG ? new Throwable().getStackTrace() : null; } @@ -195,4 +195,14 @@ public class CachedPage implements ICachedPageInternal { void setNext(CachedPage next) { this.next = next; } + + @Override + public void setDiskPageId(long dpid) { + this.dpid = dpid; + } + + @Override + public boolean isLargePage() { + return multiplier > 1; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java index 8f7a965..f3de1c1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java @@ -77,11 +77,6 @@ public class DebugBufferCache implements IBufferCache { } @Override - public ICachedPage tryPin(long dpid) throws HyracksDataException { - return bufferCache.tryPin(dpid); - } - - @Override public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException { ICachedPage page = bufferCache.pin(dpid, newPage); pinCount.addAndGet(1); @@ -105,8 +100,8 @@ public class DebugBufferCache implements IBufferCache { } @Override - public int getNumPages() { - return bufferCache.getNumPages(); + public int getPageBudget() { + return bufferCache.getPageBudget(); } @Override @@ -168,8 +163,8 @@ public class DebugBufferCache implements IBufferCache { } @Override - public void flushDirtyPage(ICachedPage page) throws HyracksDataException { - bufferCache.flushDirtyPage(page); + public void flush(ICachedPage page) throws HyracksDataException { + bufferCache.flush(page); } @Override @@ -183,11 +178,6 @@ public class DebugBufferCache implements IBufferCache { } @Override - public void adviseWontNeed(ICachedPage page) { - bufferCache.adviseWontNeed(page); - } - - @Override public ICachedPage confiscatePage(long dpid) throws HyracksDataException { return bufferCache.confiscatePage(dpid); } @@ -214,12 +204,6 @@ public class DebugBufferCache implements IBufferCache { } @Override - public void setPageDiskId(ICachedPage page, long dpid) { - // TODO Auto-generated method stub - - } - - @Override public void returnPage(ICachedPage page, boolean reinsert) { // TODO Auto-generated method stub http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java index 6774ddd..9d0b728 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java @@ -18,19 +18,15 @@ package org.apache.hyracks.storage.common.buffercache; import org.apache.hyracks.api.exceptions.HyracksDataException; public class FIFOLocalWriter implements IFIFOPageWriter { - private static FIFOLocalWriter instance; + public static final FIFOLocalWriter INSTANCE = new FIFOLocalWriter(); private static boolean DEBUG = false; - public static FIFOLocalWriter instance() { - if(instance == null) { - instance = new FIFOLocalWriter(); - } - return instance; + private FIFOLocalWriter() { } @Override public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException { - CachedPage cPage = (CachedPage)page; + CachedPage cPage = (CachedPage) page; try { bufferCache.write(cPage); } finally { @@ -43,6 +39,6 @@ public class FIFOLocalWriter implements IFIFOPageWriter { @Override public void sync(int fileId, BufferCache bufferCache) throws HyracksDataException { - bufferCache.force(fileId,true); + bufferCache.force(fileId, true); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java index 789f7b7..28801ea 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java @@ -88,50 +88,185 @@ public interface IBufferCache { */ void deleteFile(FileReference file) throws HyracksDataException; - ICachedPage tryPin(long dpid) throws HyracksDataException; - + /** + * Pin the page so it can't be evicted from the buffer cache... + * + * @param dpid + * page id is a unique id that is a combination of file id and page id + * @param newPage + * whether this page is expected to be new. + * NOTE: undefined: + * -- what if the flag is true but the page exists? + * -- what if the flag is false but the page doesn't exist + * @return the pinned page + * @throws HyracksDataException + */ ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException; + /** + * Unpin a pinned page so its buffer can be recycled + * + * @param page + * the page + * @throws HyracksDataException + */ void unpin(ICachedPage page) throws HyracksDataException; - void flushDirtyPage(ICachedPage page) throws HyracksDataException; + /** + * Flush the page if it is dirty + * + * @param page + * the page to flush + * @throws HyracksDataException + */ + void flush(ICachedPage page) throws HyracksDataException; - void adviseWontNeed(ICachedPage page); + /** + * Force bits that have been already flushed to disk + * This method doesn't flush all dirty pages to disk but simply calls the sync method on the filesystem api + * + * @param fileId + * the file id + * @param metadata + * whether metadata should be synced as well + * @throws HyracksDataException + */ + void force(int fileId, boolean metadata) throws HyracksDataException; + /** + * Take a page such that no one else has access to it + * + * @param dpid + * the unique (fileId,pageId) + * @return the confiscated page or null if no page is available + * @throws HyracksDataException + */ ICachedPage confiscatePage(long dpid) throws HyracksDataException; + /** + * + * @return the confiscated page or null if no page is available + * @throws HyracksDataException + */ + /** + * Take a large page such that no one else has access to it + * + * @param dpid + * the unique (fileId,pageId) + * @param multiplier + * how many multiples of the original page size + * @param extraBlockPageId + * the page id where the large block comes from + * @return + * the confiscated page or null if a large page couldn't be found + * @throws HyracksDataException + */ ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) throws HyracksDataException; + /** + * Return and re-insert a confiscated page + * + * @param page + * the confiscated page + */ void returnPage(ICachedPage page); + /** + * Return a confiscated page + * + * @param page + * the confiscated page + * @param reinsert + * if true, return the page to the cache, otherwise, destroy + */ void returnPage(ICachedPage page, boolean reinsert); - void force(int fileId, boolean metadata) throws HyracksDataException; - + /** + * Get the standard page size + * + * @return the size in bytes + */ int getPageSize(); + /** + * Get the standard page size with header if any + * + * @return the sum of page size and header size in bytes + */ int getPageSizeWithHeader(); - int getNumPages(); + /** + * @return the maximum allowed pages in this buffer cahce + */ + int getPageBudget(); + /** + * Get the number of pages used for a file + * + * @param fileId + * the file id + * @return the number of pages used for the file + * @throws HyracksDataException + */ int getNumPagesOfFile(int fileId) throws HyracksDataException; + /** + * Get the reference count for a file (num of open - num of close) + * + * @param fileId + * the file + * @return the reference count + */ int getFileReferenceCount(int fileId); + /** + * Close the buffer cache, all of its files, and release the memory taken by it + * The buffer cache is open upon successful instantiation and can't be re-opened + * + * @throws HyracksDataException + */ void close() throws HyracksDataException; + /** + * @return an instance of {@link IFIFOPageQueue} that can be used to write pages to the file + */ IFIFOPageQueue createFIFOQueue(); + /** + * Flush the queued pages written through buffer cache FIFO queues + */ void finishQueue(); - void setPageDiskId(ICachedPage page, long dpid); - + // TODO: remove the replication out of the buffer cache interface + /** + * @return true if replication is enabled, false otherwise + */ boolean isReplicationEnabled(); + /** + * @return the io replication manager + */ IIOReplicationManager getIOReplicationManager(); + /** + * Deletes the file and recycle all of its pages without flushing them. + * + * ONLY call this if you absolutely, positively know this file has no dirty pages in the cache! + * Bypasses the normal lifecycle of a file handle and evicts all references to it immediately. + */ void purgeHandle(int fileId) throws HyracksDataException; + /** + * Resize the page + * + * @param page + * the page to resize + * @param multiplier + * how many multiples of the original page size + * @param extraPageBlockHelper + * helper to determine the location of the resize block + * @throws HyracksDataException + */ void resizePage(ICachedPage page, int multiplier, IExtraPageBlockHelper extraPageBlockHelper) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java index abbe233..16837b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java @@ -41,4 +41,13 @@ public interface ICachedPage { int getPageSize(); int getFrameSizeMultiplier(); + + void setDiskPageId(long dpid); + + /** + * Check if a page is a large page + * + * @return true if the page is large, false otherwise + */ + boolean isLargePage(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java index ad7f2f6..607385a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java @@ -21,6 +21,25 @@ package org.apache.hyracks.storage.common.buffercache; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IExtraPageBlockHelper { + /** + * Get the page id of the free block of size + * + * @param size + * the size of the block + * @return + * the page id + * @throws HyracksDataException + */ int getFreeBlock(int size) throws HyracksDataException; + + /** + * Release the block at location blockPageId which has size size + * + * @param blockPageId + * the block page id + * @param size + * the size of the block + * @throws HyracksDataException + */ void returnFreePageBlock(int blockPageId, int size) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java index 0fe5767..6c03671 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java @@ -17,6 +17,7 @@ package org.apache.hyracks.storage.common.buffercache; import org.apache.hyracks.api.exceptions.HyracksDataException; +@FunctionalInterface public interface IFIFOPageQueue { - public void put(ICachedPage page) throws HyracksDataException; + void put(ICachedPage page) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java index cfca77a..139a3c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java @@ -121,4 +121,14 @@ public class VirtualPage implements ICachedPage { this.buffer = buffer; } + @Override + public void setDiskPageId(long dpid) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLargePage() { + return multiplier > 1; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java index 5ff5a11..0e749fc 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java @@ -25,49 +25,139 @@ import java.util.HashSet; import java.util.Random; import java.util.Set; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.SingleThreadEventProcessor; import org.apache.hyracks.control.nc.io.IOManager; import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache; import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator; import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator; import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper; import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.test.support.TestStorageManagerComponentHolder; +import org.junit.Assert; import org.junit.Test; public class VirtualBufferCacheTest { - private static final long SEED = 123456789L; - private static final int NUM_OVERPIN = 128; - private static final int PAGE_SIZE = 256; - private static final int NUM_FILES = 10; - private static final int NUM_PAGES = 1000; + /* + * Missing tests: + * 0. concurrent pinnings for a single file from multiple threads + * 1. concurrent create file + * 2. file deletes while pages are pinned? Note that currently, the vbc doesn't keep track of number of pinnings + */ + private static class TestExtraPageBlockHelper implements IExtraPageBlockHelper { + private final int fileId; + private int pinCount; + private Set<ICachedPage> pinnedPages; + private int totalNumPages; - private final Random random; - private final FileState[] fileStates; + public TestExtraPageBlockHelper(int fileId) { + this.fileId = fileId; + pinCount = 0; + pinnedPages = new HashSet<>(); + } - private VirtualBufferCache vbc; - private IOManager ioManager; + @Override + public int getFreeBlock(int size) throws HyracksDataException { + int before = totalNumPages; + totalNumPages += size - 1; + return before; + } - public VirtualBufferCacheTest() { - fileStates = new FileState[NUM_FILES]; - for (int i = 0; i < NUM_FILES; i++) { - fileStates[i] = new FileState(); + @Override + public void returnFreePageBlock(int blockPageId, int size) throws HyracksDataException { + // Do nothing. we don't reclaim large pages from file in this test } - random = new Random(SEED); - vbc = null; + + public void pin(VirtualBufferCache vbc, int multiplier) throws HyracksDataException { + ICachedPage p = vbc.pin(BufferedFileHandle.getDiskPageId(fileId, pinCount), true); + pinnedPages.add(p); + pinCount++; + totalNumPages++; + if (multiplier > 1) { + vbc.resizePage(p, multiplier, this); + } + } + } private static class FileState { - private int fileId; + private final VirtualBufferCache vbc; + private final int fileId; + private final TestExtraPageBlockHelper helper; private FileReference fileRef; - private int pinCount; - private Set<ICachedPage> pinnedPages; - public FileState() { - fileId = -1; - fileRef = null; - pinCount = 0; - pinnedPages = new HashSet<>(); + public FileState(VirtualBufferCache vbc, String fileName) throws HyracksDataException { + this.vbc = vbc; + IOManager ioManager = TestStorageManagerComponentHolder.getIOManager(); + fileRef = ioManager.resolve(fileName); + vbc.createFile(fileRef); + fileId = vbc.getFileMapProvider().lookupFileId(fileRef); + helper = new TestExtraPageBlockHelper(fileId); + } + + public void pin(int multiplier) throws HyracksDataException { + helper.pin(vbc, multiplier); + } + } + + private static class Request { + private enum Type { + PIN_PAGE, + CALLBACK + } + + private final Type type; + private boolean done; + + public Request(Type type) { + this.type = type; + done = false; + } + + Type getType() { + return type; + } + + synchronized void complete() { + done = true; + notifyAll(); + } + + synchronized void await() throws InterruptedException { + while (!done) { + wait(); + } + } + } + + public class User extends SingleThreadEventProcessor<Request> { + private final VirtualBufferCache vbc; + private final FileState fileState; + + public User(String name, VirtualBufferCache vbc, FileState fileState) throws HyracksDataException { + super(name); + this.vbc = vbc; + this.fileState = fileState; + } + + @Override + protected void handle(Request req) throws Exception { + try { + switch (req.getType()) { + case PIN_PAGE: + ICachedPage p = vbc.pin( + BufferedFileHandle.getDiskPageId(fileState.fileId, fileState.helper.pinCount), true); + fileState.helper.pinnedPages.add(p); + ++fileState.helper.pinCount; + break; + default: + break; + } + } finally { + req.complete(); + } } } @@ -79,60 +169,184 @@ public class VirtualBufferCacheTest { * of pages. */ @Test - public void test01() throws Exception { - TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, NUM_FILES); - ioManager = TestStorageManagerComponentHolder.getIOManager(); + public void testDisjointPins() throws Exception { + final int numOverpin = 128; + final int pageSize = 256; + final int numFiles = 10; + final int numPages = 1000; + Random random = new Random(); ICacheMemoryAllocator allocator = new HeapBufferAllocator(); - vbc = new VirtualBufferCache(allocator, PAGE_SIZE, NUM_PAGES); + VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages); vbc.open(); - createFiles(); + FileState[] fileStates = new FileState[numFiles]; + for (int i = 0; i < numFiles; i++) { + fileStates[i] = new FileState(vbc, String.format("f%d", i)); + } - kPins(NUM_PAGES); - assertTrue(pagesDisjointed()); + kPins(numPages, numFiles, fileStates, vbc, random); + assertTrue(pagesDisjointed(numFiles, fileStates)); - kPins(NUM_OVERPIN); - assertTrue(pagesDisjointed()); + kPins(numOverpin, numFiles, fileStates, vbc, random); + assertTrue(pagesDisjointed(numFiles, fileStates)); - deleteFiles(); + deleteFilesAndCheckMemory(numFiles, fileStates, vbc); vbc.close(); } - private boolean pagesDisjointed() { + @Test + public void testConcurrentUsersDifferentFiles() throws Exception { + final int numOverpin = 128; + final int pageSize = 256; + final int numFiles = 10; + final int numPages = 1000; + Random random = new Random(); + ICacheMemoryAllocator allocator = new HeapBufferAllocator(); + VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages); + vbc.open(); + FileState[] fileStates = new FileState[numFiles]; + User[] users = new User[numFiles]; + for (int i = 0; i < numFiles; i++) { + fileStates[i] = new FileState(vbc, String.format("f%d", i)); + users[i] = new User("User-" + i, vbc, fileStates[i]); + } + for (int i = 0; i < numPages; i++) { + int fsIdx = random.nextInt(numFiles); + users[fsIdx].add(new Request(Request.Type.PIN_PAGE)); + } + // ensure all are done + wait(users); + assertTrue(pagesDisjointed(numFiles, fileStates)); + for (int i = 0; i < numOverpin; i++) { + int fsIdx = random.nextInt(numFiles); + users[fsIdx].add(new Request(Request.Type.PIN_PAGE)); + } + // ensure all are done + wait(users); + assertTrue(pagesDisjointed(numFiles, fileStates)); + // shutdown users + shutdown(users); + deleteFilesAndCheckMemory(numFiles, fileStates, vbc); + vbc.close(); + } + + private void shutdown(User[] users) throws HyracksDataException, InterruptedException { + for (int i = 0; i < users.length; i++) { + users[i].stop(); + } + } + + private void wait(User[] users) throws InterruptedException { + for (int i = 0; i < users.length; i++) { + Request callback = new Request(Request.Type.CALLBACK); + users[i].add(callback); + callback.await(); + } + } + + @Test + public void testLargePages() throws Exception { + final int pageSize = 256; + final int numFiles = 3; + final int numPages = 1000; + ICacheMemoryAllocator allocator = new HeapBufferAllocator(); + VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages); + vbc.open(); + FileState[] fileStates = new FileState[numFiles]; + for (int i = 0; i < numFiles; i++) { + fileStates[i] = new FileState(vbc, String.format("f%d", i)); + } + // Get a large page that is 52 pages size + int fileIdx = 0; + FileState f = fileStates[fileIdx]; + f.pin(52); + // Assert that 52 pages are accounted for + Assert.assertEquals(52, vbc.getUsage()); + // Delete file + vbc.deleteFile(f.fileId); + // Assert that usage fell down to 0 + Assert.assertEquals(0, vbc.getUsage()); + // Assert that no pages are pre-allocated + Assert.assertEquals(0, vbc.getPreAllocatedPages()); + // Next file + fileIdx++; + f = fileStates[fileIdx]; + // Pin small pages to capacity + int count = 0; + while (vbc.getUsage() <= vbc.getPageBudget()) { + f.pin(1); + count++; + Assert.assertEquals(count, vbc.getUsage()); + } + // Delete file + vbc.deleteFile(f.fileRef); + // Assert that usage fell down to 0 + Assert.assertEquals(0, vbc.getUsage()); + // Assert that small pages are available + Assert.assertEquals(vbc.getPreAllocatedPages(), vbc.getPageBudget()); + // Next file + fileIdx++; + f = fileStates[fileIdx]; + count = 0; + int sizeOfLargePage = 4; + while (vbc.getUsage() <= vbc.getPageBudget()) { + f.pin(sizeOfLargePage); + count += sizeOfLargePage; + Assert.assertEquals(count, vbc.getUsage()); + Assert.assertEquals(Integer.max(0, vbc.getPageBudget() - count), vbc.getPreAllocatedPages()); + } + // Delete file + vbc.deleteFile(f.fileId); + // Assert that usage fell down to 0 + Assert.assertEquals(0, vbc.getUsage()); + // Assert that no pages are pre-allocated + Assert.assertEquals(0, vbc.getPreAllocatedPages()); + vbc.close(); + } + + private boolean pagesDisjointed(int numFiles, FileState[] fileStates) { boolean disjoint = true; - for (int i = 0; i < NUM_FILES; i++) { + for (int i = 0; i < numFiles; i++) { FileState fi = fileStates[i]; - for (int j = i + 1; j < NUM_FILES; j++) { + for (int j = i + 1; j < numFiles; j++) { FileState fj = fileStates[j]; - disjoint = disjoint && Collections.disjoint(fi.pinnedPages, fj.pinnedPages); + disjoint = disjoint && Collections.disjoint(fi.helper.pinnedPages, fj.helper.pinnedPages); } } return disjoint; } - private void createFiles() throws Exception { - for (int i = 0; i < NUM_FILES; i++) { - FileState f = fileStates[i]; - String fName = String.format("f%d", i); - f.fileRef = ioManager.resolve(fName); - vbc.createFile(f.fileRef); - f.fileId = vbc.getFileMapProvider().lookupFileId(f.fileRef); + private void deleteFilesAndCheckMemory(int numFiles, FileState[] fileStates, VirtualBufferCache vbc) + throws Exception { + // Get the size of the buffer cache + int totalInStates = 0; + for (int i = 0; i < numFiles; i++) { + totalInStates += fileStates[i].helper.pinnedPages.size(); } - } - - private void deleteFiles() throws Exception { - for (int i = 0; i < NUM_FILES; i++) { + Assert.assertEquals(totalInStates, vbc.getUsage()); + int totalFree = 0; + Assert.assertEquals(totalFree, vbc.getPreAllocatedPages()); + boolean hasLargePages = vbc.getLargePages() > 0; + for (int i = 0; i < numFiles; i++) { + int expectedToBeReclaimed = 0; + for (ICachedPage page : fileStates[i].helper.pinnedPages) { + expectedToBeReclaimed += page.getFrameSizeMultiplier(); + } vbc.deleteFile(fileStates[i].fileId); + totalFree += expectedToBeReclaimed; + Assert.assertEquals(totalInStates - totalFree, vbc.getUsage()); + if (!hasLargePages) { + Assert.assertEquals(Integer.max(0, vbc.getPageBudget() - vbc.getUsage()), vbc.getPreAllocatedPages()); + } } } - private void kPins(int k) throws Exception { + private void kPins(int k, int numFiles, FileState[] fileStates, VirtualBufferCache vbc, Random random) + throws Exception { int numPinned = 0; while (numPinned < k) { - int fsIdx = random.nextInt(NUM_FILES); + int fsIdx = random.nextInt(numFiles); FileState f = fileStates[fsIdx]; - ICachedPage p = vbc.pin(BufferedFileHandle.getDiskPageId(f.fileId, f.pinCount), true); - f.pinnedPages.add(p); - ++f.pinCount; + f.pin(1); ++numPinned; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java index e34ee0e..26ad457 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java @@ -78,10 +78,6 @@ public class BufferCacheTest { ICachedPage page = null; - // tryPin should fail - page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId)); - Assert.assertNull(page); - // pin page should succeed page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), true); page.acquireWriteLatch(); @@ -89,12 +85,6 @@ public class BufferCacheTest { for (int i = 0; i < num; i++) { page.getBuffer().putInt(i * 4, i); } - - // try pin should succeed - ICachedPage page2 = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId)); - Assert.assertNotNull(page2); - bufferCache.unpin(page2); - } finally { page.releaseWriteLatch(true); bufferCache.unpin(page); @@ -102,31 +92,11 @@ public class BufferCacheTest { bufferCache.closeFile(fileId); - // This code is commented because the method pinSanityCheck in the BufferCache is commented. - /*boolean exceptionThrown = false; - - // tryPin should fail since file is not open - try { - page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId)); - } catch (HyracksDataException e) { - exceptionThrown = true; - } - Assert.assertTrue(exceptionThrown); - - // pin should fail since file is not open - exceptionThrown = false; - try { - page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false); - } catch (HyracksDataException e) { - exceptionThrown = true; - } - Assert.assertTrue(exceptionThrown);*/ - // open file again bufferCache.openFile(fileId); // tryPin should succeed because page should still be cached - page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId)); + page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false); Assert.assertNotNull(page); page.acquireReadLatch(); try {
