This is an automated email from the ASF dual-hosted git repository. larsh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 4a911a8 PHOENIX-5123 Avoid using MappedByteBuffers for server side GROUP BY. 4a911a8 is described below commit 4a911a8fde10c52da54321c328c5e594c996ccee Author: Lars Hofhansl <la...@apache.org> AuthorDate: Wed Feb 6 13:27:26 2019 -0800 PHOENIX-5123 Avoid using MappedByteBuffers for server side GROUP BY. --- .../apache/phoenix/cache/aggcache/SpillFile.java | 78 +++++++++--------- .../apache/phoenix/cache/aggcache/SpillMap.java | 94 ++++++++++++---------- 2 files changed, 88 insertions(+), 84 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java index 51aef98..a47cfdf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java @@ -51,27 +51,23 @@ public class SpillFile implements Closeable { private Map<Integer, TempFile> tempFiles; // Custom spill files directory private File spillFilesDirectory = null; - + // Wrapper class for a TempFile: File + RandomAccessFile - private static class TempFile implements Closeable{ - private RandomAccessFile rndFile; - private File file; - - public TempFile(File file, RandomAccessFile rndFile) { - this.file = file; - this.rndFile = rndFile; - } - - public FileChannel getChannel() { - return rndFile.getChannel(); - } + private static class TempFile implements Closeable { + private final RandomAccessFile rndFile; + private final File file; + + public TempFile(File file, RandomAccessFile rndFile) { + this.file = file; + this.rndFile = rndFile; + } + + @Override + public void close() throws IOException { + Closeables.closeQuietly(rndFile.getChannel()); + Closeables.closeQuietly(rndFile); - @Override - public void close() throws IOException { - Closeables.closeQuietly(rndFile.getChannel()); - Closeables.closeQuietly(rndFile); - - if (file != null) { + if (file != null) { if (logger.isDebugEnabled()) { logger.debug("Deleting tempFile: " + file.getAbsolutePath()); } @@ -79,9 +75,9 @@ public class SpillFile implements Closeable { file.delete(); } catch (SecurityException e) { logger.warn("IOException thrown while closing Closeable." + e); - } + } } - } + } } private SpillFile(File spillFilesDirectory) throws IOException { @@ -120,29 +116,29 @@ public class SpillFile implements Closeable { /** * Random access to a page of the current spill file * @param index + * @return a file seeked to the correct page */ - public MappedByteBuffer getPage(int index) { + public RandomAccessFile getPage(int index) { try { - TempFile tempFile = null; - int fileIndex = 0; - - long offset = (long) index * (long) DEFAULT_PAGE_SIZE; - if(offset >= SPILL_FILE_SIZE) { - // Offset exceeds the first SpillFile size - // Get the index of the file that should contain the pageID - fileIndex = (int)(offset / SPILL_FILE_SIZE); - if(!tempFiles.containsKey(fileIndex)) { - // Dynamically add new spillFiles if directory grows beyond - // max page ID. - tempFile = createTempFile(); - tempFiles.put(fileIndex, tempFile); - } - } - tempFile = tempFiles.get(fileIndex); - // Channel gets buffered in file object - FileChannel fc = tempFile.getChannel(); + TempFile tempFile = null; + int fileIndex = 0; - return fc.map(MapMode.READ_WRITE, offset, DEFAULT_PAGE_SIZE); + long offset = (long) index * (long) DEFAULT_PAGE_SIZE; + if (offset >= SPILL_FILE_SIZE) { + // Offset exceeds the first SpillFile size + // Get the index of the file that should contain the pageID + fileIndex = (int) (offset / SPILL_FILE_SIZE); + if (!tempFiles.containsKey(fileIndex)) { + // Dynamically add new spillFiles if directory grows beyond + // max page ID. + tempFile = createTempFile(); + tempFiles.put(fileIndex, tempFile); + } + } + tempFile = tempFiles.get(fileIndex); + RandomAccessFile file = tempFile.rndFile; + file.seek(offset); + return file; } catch (IOException ioe) { // Close resource close(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java index bb4ce2e..cff1e44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java @@ -19,8 +19,8 @@ package org.apache.phoenix.cache.aggcache; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.BufferOverflowException; -import java.nio.MappedByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; @@ -55,7 +55,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements private int curMapBufferIndex; private SpillFile spillFile; // Directory of hash buckets --> extendible hashing implementation - private MappedByteBufferMap[] directory; + private FileMap[] directory; private final SpillableGroupByCache.QueryCache cache; public SpillMap(SpillFile file, int thresholdBytes, int estValueSize, SpillableGroupByCache.QueryCache cache) @@ -67,11 +67,11 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements // Init the e-hashing directory structure globalDepth = 1; - directory = new MappedByteBufferMap[(1 << globalDepth)]; + directory = new FileMap[(1 << globalDepth)]; for (int i = 0; i < directory.length; i++) { // Create an empty bucket list - directory[i] = new MappedByteBufferMap(i, this.thresholdBytes, pageInserts, file); + directory[i] = new FileMap(i, this.thresholdBytes, pageInserts, file); directory[i].flushBuffer(); } directory[0].pageIn(); @@ -93,7 +93,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements // for bucket splits private void redistribute(int index, ImmutableBytesPtr keyNew, byte[] valueNew) { // Get the respective bucket - MappedByteBufferMap byteMap = directory[index]; + FileMap byteMap = directory[index]; // Get the actual bucket index, that the directory index points to int mappedIdx = byteMap.pageIndex; @@ -119,8 +119,8 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements int b2Index = Math.max(index, tmpIndex); // Create two new split buckets - MappedByteBufferMap b1 = new MappedByteBufferMap(b1Index, thresholdBytes, pageInserts, spillFile); - MappedByteBufferMap b2 = new MappedByteBufferMap(b2Index, thresholdBytes, pageInserts, spillFile); + FileMap b1 = new FileMap(b1Index, thresholdBytes, pageInserts, spillFile); + FileMap b2 = new FileMap(b2Index, thresholdBytes, pageInserts, spillFile); // redistribute old elements into b1 and b2 for (Entry<ImmutableBytesPtr, byte[]> element : byteMap.pageMap.entrySet()) { @@ -182,7 +182,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements Preconditions.checkArgument(newDirSize < Integer.MAX_VALUE); // Double it! - MappedByteBufferMap[] newDirectory = new MappedByteBufferMap[newDirSize]; + FileMap[] newDirectory = new FileMap[newDirSize]; for (int i = 0; i < directory.length; i++) { newDirectory[i] = directory[i]; newDirectory[i + directory.length] = directory[i]; @@ -212,12 +212,12 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements byte[] value = null; int bucketIndex = getBucketIndex(ikey); - MappedByteBufferMap byteMap = directory[bucketIndex]; + FileMap byteMap = directory[bucketIndex]; // Decision based on bucket ID, not the directory ID due to the n:1 relationship if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) { // map not paged in - MappedByteBufferMap curByteMap = directory[curMapBufferIndex]; + FileMap curByteMap = directory[curMapBufferIndex]; // Use bloomFilter to check if key was spilled before if (byteMap.containsKey(ikey.copyBytesIfNecessary())) { @@ -240,10 +240,10 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements private byte[] getAlways(ImmutableBytesPtr key) { byte[] value = null; int bucketIndex = getBucketIndex(key); - MappedByteBufferMap byteMap = directory[bucketIndex]; + FileMap byteMap = directory[bucketIndex]; if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) { - MappedByteBufferMap curByteMap = directory[curMapBufferIndex]; + FileMap curByteMap = directory[curMapBufferIndex]; // ensure consistency and flush current memory page to disk curByteMap.flushBuffer(); @@ -266,7 +266,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements // page in element and replace if present byte[] spilledValue = getAlways(key); - MappedByteBufferMap byteMap = directory[curMapBufferIndex]; + FileMap byteMap = directory[curMapBufferIndex]; int index = curMapBufferIndex; // TODO: We split buckets until the new element fits onto a @@ -308,9 +308,9 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements * page for easy get() and update() calls on an individual key The class keeps track of the current size of the in * memory page and handles flushing and paging in respectively */ - private static class MappedByteBufferMap { - private SpillFile spillFile; - private int pageIndex; + private static class FileMap { + private final SpillFile spillFile; + private final int pageIndex; private final int thresholdBytes; private long totalResultSize; private boolean pagedIn; @@ -323,7 +323,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements // Used to determine is an element was written to this page before or not BloomFilter<byte[]> bFilter; - public MappedByteBufferMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) { + public FileMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) { this.spillFile = spillFile; // size threshold of a page this.thresholdBytes = thresholdBytes; @@ -363,24 +363,33 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements } // Flush the current page to the memory mapped byte buffer - private void flushBuffer() throws BufferOverflowException { + private void flushBuffer() { if (pagedIn) { - MappedByteBuffer buffer; // Only flush if page was changed if (dirtyPage) { Collection<byte[]> values = pageMap.values(); - buffer = spillFile.getPage(pageIndex); - buffer.clear(); + RandomAccessFile file = spillFile.getPage(pageIndex); // number of elements - buffer.putInt(values.size()); - for (byte[] value : values) { - // element length - buffer.putInt(value.length); - // element - buffer.put(value, 0, value.length); + try { + file.writeInt(values.size()); + int written = Bytes.SIZEOF_INT; + for (byte[] value : values) { + written += Bytes.SIZEOF_INT + value.length; + // safety check + if (written > SpillFile.DEFAULT_PAGE_SIZE) { + throw new BufferOverflowException(); + } + // element length + file.writeInt(value.length); + // element + file.write(value, 0, value.length); + } + } catch (IOException ioe) { + // Error during key access on spilled resource + // TODO rework error handling + throw new RuntimeException(ioe); } } - buffer = null; // Reset page stats pageMap.clear(); totalResultSize = 0; @@ -389,24 +398,23 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements dirtyPage = false; } - // load memory mapped region into a map for fast element access - private void pageIn() throws IndexOutOfBoundsException { + // load a page into a map for fast element access + private void pageIn() { if (!pagedIn) { - // Map the memory region - MappedByteBuffer buffer = spillFile.getPage(pageIndex); - int numElements = buffer.getInt(); + RandomAccessFile file = spillFile.getPage(pageIndex); + try { + int numElements = file.readInt(); for (int i = 0; i < numElements; i++) { - int kvSize = buffer.getInt(); + int kvSize = file.readInt(); byte[] data = new byte[kvSize]; - buffer.get(data, 0, kvSize); - try { - pageMap.put(SpillManager.getKey(data), data); - totalResultSize += (data.length + Bytes.SIZEOF_INT); - } catch (IOException ioe) { - // Error during key access on spilled resource - // TODO rework error handling - throw new RuntimeException(ioe); - } + file.readFully(data); + pageMap.put(SpillManager.getKey(data), data); + totalResultSize += (data.length + Bytes.SIZEOF_INT); + } + } catch (IOException ioe) { + // Error during key access on spilled resource + // TODO rework error handling + throw new RuntimeException(ioe); } pagedIn = true; dirtyPage = false;