Repository: hive Updated Branches: refs/heads/master a88050bd9 -> cbebb4d78
HIVE-12837 : Better memory estimation/allocation for hybrid grace hash join during hash table loading (Wei Zheng, reviewed by Vikram Dixit K) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbebb4d7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbebb4d7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbebb4d7 Branch: refs/heads/master Commit: cbebb4d78064a9098e4145a0f7532f08885c9b27 Parents: a88050b Author: Wei Zheng <w...@apache.org> Authored: Wed May 4 23:09:08 2016 -0700 Committer: Wei Zheng <w...@apache.org> Committed: Wed May 4 23:09:08 2016 -0700 ---------------------------------------------------------------------- .../persistence/HybridHashTableContainer.java | 60 +++++++++++++++----- .../ql/exec/persistence/KeyValueContainer.java | 4 ++ 2 files changed, 51 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index f5da5a4..5552dfb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -90,6 +90,7 @@ public class HybridHashTableContainer private boolean lastPartitionInMem; // only one (last one) partition is left in memory private final int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full private final HybridHashTableConf nwayConf; // configuration for n-way join + private int writeBufferSize; // write buffer size for BytesBytesMultiHashMap /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -294,7 +295,6 @@ public class HybridHashTableContainer this.spillLocalDirs = spillLocalDirs; this.nwayConf = nwayConf; - int writeBufferSize; int numPartitions; if (nwayConf == null) { // binary join numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize); @@ -327,7 +327,9 @@ public class HybridHashTableContainer writeBufferSize : Integer.highestOneBit(writeBufferSize); // Cap WriteBufferSize to avoid large preallocations - writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize, writeBufferSize); + // We also want to limit the size of writeBuffer, because we normally have 16 partitions, that + // makes spilling prediction (isMemoryFull) to be too defensive which results in unnecessary spilling + writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize / numPartitions, writeBufferSize); this.bloom1 = new BloomFilter(newKeyCount); @@ -417,6 +419,11 @@ public class HybridHashTableContainer for (HashPartition hp : hashPartitions) { if (hp.hashMap != null) { memUsed += hp.hashMap.memorySize(); + } else { + // also include the still-in-memory sidefile, before it has been truely spilled + if (hp.sidefileKVContainer != null) { + memUsed += hp.sidefileKVContainer.numRowsInReadBuffer() * tableRowSize; + } } } return memoryUsed = memUsed; @@ -454,6 +461,8 @@ public class HybridHashTableContainer private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, Writable currentKey, Writable currentValue) throws SerDeException, IOException { + boolean putToSidefile = false; // by default we put row into partition in memory + // Next, put row into corresponding hash partition int keyHash = keyValueHelper.getHashFromKey(); int partitionId = keyHash & (hashPartitions.length - 1); @@ -461,15 +470,13 @@ public class HybridHashTableContainer bloom1.addLong(keyHash); - if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { - KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); - kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); - } else { - hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation - totalInMemRowCount++; - - if ((totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0 && // check periodically - !lastPartitionInMem) { // If this is the only partition in memory, proceed without check + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // destination on disk + putToSidefile = true; + } else { // destination in memory + if (!lastPartitionInMem && // If this is the only partition in memory, proceed without check + (hashPartition.size() == 0 || // Destination partition being empty indicates a write buffer + // will be allocated, thus need to check if memory is full + (totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0)) { // check periodically if (isMemoryFull()) { if ((numPartitionsSpilled == hashPartitions.length - 1) ) { LOG.warn("This LAST partition in memory won't be spilled!"); @@ -479,9 +486,16 @@ public class HybridHashTableContainer int biggest = biggestPartition(); spillPartition(biggest); this.setSpill(true); + if (partitionId == biggest) { // destination hash partition has just be spilled + putToSidefile = true; + } } else { // n-way join LOG.info("N-way spilling: spill tail partition from previously loaded small tables"); + int biggest = nwayConf.getNextSpillPartition(); memoryThreshold += nwayConf.spill(); + if (biggest != 0 && partitionId == biggest) { // destination hash partition has just be spilled + putToSidefile = true; + } LOG.info("Memory threshold has been increased to: " + memoryThreshold); } numPartitionsSpilled++; @@ -490,6 +504,15 @@ public class HybridHashTableContainer } } + // Now we know where to put row + if (putToSidefile) { + KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); + kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); + } else { + hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation + totalInMemRowCount++; + } + return null; // there's no key to return } @@ -513,11 +536,21 @@ public class HybridHashTableContainer } /** - * Check if the memory threshold is reached + * Check if the memory threshold is about to be reached. + * Since all the write buffer will be lazily allocated in BytesBytesMultiHashMap, we need to + * consider those as well. * @return true if memory is full, false if not */ private boolean isMemoryFull() { - return refreshMemoryUsed() >= memoryThreshold; + int numPartitionsInMem = 0; + + for (HashPartition hp : hashPartitions) { + if (!hp.isHashMapOnDisk()) { + numPartitionsInMem++; + } + } + + return refreshMemoryUsed() + writeBufferSize * numPartitionsInMem >= memoryThreshold; } /** @@ -561,6 +594,7 @@ public class HybridHashTableContainer new com.esotericsoftware.kryo.io.Output(outputStream); Kryo kryo = SerializationUtilities.borrowKryo(); try { + LOG.info("Trying to spill hash partition " + partitionId + " ..."); kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap output.close(); outputStream.close(); http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java index e2b22d3..72faf8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java @@ -215,6 +215,10 @@ public class KeyValueContainer { return row; } + public int numRowsInReadBuffer() { + return rowsInReadBuffer; + } + public int size() { return rowsInReadBuffer + rowsOnDisk; }