Repository: hive Updated Branches: refs/heads/master eb7647979 -> 9cb545507
HIVE-14446 : Add switch to control BloomFilter in Hybrid grace hash join and make the FPP adjustable (Wei Zheng, reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9cb54550 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9cb54550 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9cb54550 Branch: refs/heads/master Commit: 9cb54550751572d4187dd1891675b6fc48a7fff8 Parents: eb76479 Author: Wei Zheng <[email protected]> Authored: Wed Aug 24 11:17:53 2016 -0700 Committer: Wei Zheng <[email protected]> Committed: Wed Aug 24 11:17:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../hive/common/util/TestBloomFilter.java | 6 +++ .../persistence/HybridHashTableContainer.java | 57 +++++++++++++++----- 3 files changed, 52 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9cb54550/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da6e97a..14a538b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1051,6 +1051,8 @@ public class HiveConf extends Configuration { "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + "joins unnecessary memory will be allocated and then trimmed."), + HIVEHYBRIDGRACEHASHJOINBLOOMFILTER("hive.mapjoin.hybridgrace.bloomfilter", true, "Whether to " + + "use BloomFilter in Hybrid grace hash join to minimize unnecessary spilling."), HIVESMBJOINCACHEROWS("hive.smbjoin.cache.rows", 10000, "How many rows with the same key value should be cached in memory per smb joined table."), http://git-wip-us.apache.org/repos/asf/hive/blob/9cb54550/common/src/test/org/apache/hive/common/util/TestBloomFilter.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hive/common/util/TestBloomFilter.java b/common/src/test/org/apache/hive/common/util/TestBloomFilter.java index 7c2a941..63c7050 100644 --- a/common/src/test/org/apache/hive/common/util/TestBloomFilter.java +++ b/common/src/test/org/apache/hive/common/util/TestBloomFilter.java @@ -70,6 +70,12 @@ public class TestBloomFilter { assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03)); assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03)); assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05)); + assertEquals(1870567268, BloomFilter.optimalNumOfBits(300000000, 0.05)); + assertEquals(1437758756, BloomFilter.optimalNumOfBits(300000000, 0.1)); + assertEquals(432808512, BloomFilter.optimalNumOfBits(300000000, 0.5)); + assertEquals(1393332198, BloomFilter.optimalNumOfBits(3000000000L, 0.8)); + assertEquals(657882327, BloomFilter.optimalNumOfBits(3000000000L, 0.9)); + assertEquals(0, BloomFilter.optimalNumOfBits(3000000000L, 1)); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/9cb54550/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index e4a2b35..573dc08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -111,7 +111,8 @@ public class HybridHashTableContainer * This is a cheap exit option to prevent spilling the big-table in such a * scenario. */ - private transient final BloomFilter bloom1; + private transient BloomFilter bloom1 = null; + private final int BLOOM_FILTER_MAX_SIZE = 300000000; private final List<Object> EMPTY_LIST = new ArrayList<Object>(0); @@ -276,14 +277,15 @@ public class HybridHashTableContainer HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), + HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINBLOOMFILTER), estimatedTableSize, keyCount, memoryAvailable, nwayConf, HiveUtils.getLocalDirList(hconf)); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent, - long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf, - String spillLocalDirs) + boolean useBloomFilter, long estimatedTableSize, long keyCount, long memoryAvailable, + HybridHashTableConf nwayConf, String spillLocalDirs) throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); @@ -331,18 +333,29 @@ public class HybridHashTableContainer // We also want to limit the size of writeBuffer, because we normally have 16 partitions, that // makes spilling prediction (isMemoryFull) to be too defensive which results in unnecessary spilling writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize / numPartitions, writeBufferSize); + LOG.info("Write buffer size: " + writeBufferSize); + memoryUsed = 0; - this.bloom1 = new BloomFilter(newKeyCount); - - if (LOG.isInfoEnabled()) { + if (useBloomFilter) { + if (newKeyCount <= BLOOM_FILTER_MAX_SIZE) { + this.bloom1 = new BloomFilter(newKeyCount); + } else { + // To avoid having a huge BloomFilter we need to scale up False Positive Probability + double fpp = calcFPP(newKeyCount); + assert fpp < 1 : "Too many keys! BloomFilter False Positive Probability is 1!"; + if (fpp >= 0.5) { + LOG.warn("BloomFilter FPP is greater than 0.5!"); + } + LOG.info("BloomFilter is using FPP: " + fpp); + this.bloom1 = new BloomFilter(newKeyCount, fpp); + } LOG.info(String.format("Using a bloom-1 filter %d keys of size %d bytes", - newKeyCount, bloom1.sizeInBytes())); - LOG.info("Write buffer size: " + writeBufferSize); + newKeyCount, bloom1.sizeInBytes())); + memoryUsed = bloom1.sizeInBytes(); } hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; - memoryUsed = bloom1.sizeInBytes(); int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide // row size using long size @@ -406,6 +419,22 @@ public class HybridHashTableContainer } } + /** + * Calculate the proper False Positive Probability so that the BloomFilter won't grow too big + * @param keyCount number of keys + * @return FPP + */ + private double calcFPP(int keyCount) { + int n = keyCount; + double p = 0.05; + + // Calculation below is consistent with BloomFilter.optimalNumOfBits(). + // Also, we are capping the BloomFilter size below 100 MB (800000000/8) + while ((-n * Math.log(p) / (Math.log(2) * Math.log(2))) > 800000000) { + p += 0.05; + } + return p; + } public MapJoinBytesTableContainer.KeyValueHelper getWriteHelper() { return writeHelper; @@ -424,7 +453,7 @@ public class HybridHashTableContainer * @return current memory usage */ private long refreshMemoryUsed() { - long memUsed = bloom1.sizeInBytes(); + long memUsed = bloom1 != null ? bloom1.sizeInBytes() : 0; for (HashPartition hp : hashPartitions) { if (hp.hashMap != null) { memUsed += hp.hashMap.memorySize(); @@ -477,7 +506,9 @@ public class HybridHashTableContainer int partitionId = keyHash & (hashPartitions.length - 1); HashPartition hashPartition = hashPartitions[partitionId]; - bloom1.addLong(keyHash); + if (bloom1 != null) { + bloom1.addLong(keyHash); + } if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // destination on disk putToSidefile = true; @@ -909,7 +940,7 @@ public class HybridHashTableContainer public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { int keyHash = HashCodeUtil.murmurHash(output.getData(), 0, output.getLength()); - if (!bloom1.testLong(keyHash)) { + if (bloom1 != null && !bloom1.testLong(keyHash)) { /* * if the keyHash is missing in the bloom filter, then the value cannot * exist in any of the spilled partition - return NOMATCH @@ -1063,7 +1094,7 @@ public class HybridHashTableContainer int keyHash = HashCodeUtil.murmurHash(bytes, offset, length); partitionId = keyHash & (hashPartitions.length - 1); - if (!bloom1.testLong(keyHash)) { + if (bloom1 != null && !bloom1.testLong(keyHash)) { /* * if the keyHash is missing in the bloom filter, then the value cannot exist in any of the * spilled partition - return NOMATCH
