Repository: hive
Updated Branches:
  refs/heads/master eb7647979 -> 9cb545507


HIVE-14446 : Add switch to control BloomFilter in Hybrid grace hash join and 
make the FPP adjustable (Wei Zheng, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9cb54550
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9cb54550
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9cb54550

Branch: refs/heads/master
Commit: 9cb54550751572d4187dd1891675b6fc48a7fff8
Parents: eb76479
Author: Wei Zheng <[email protected]>
Authored: Wed Aug 24 11:17:53 2016 -0700
Committer: Wei Zheng <[email protected]>
Committed: Wed Aug 24 11:17:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../hive/common/util/TestBloomFilter.java       |  6 +++
 .../persistence/HybridHashTableContainer.java   | 57 +++++++++++++++-----
 3 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9cb54550/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index da6e97a..14a538b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1051,6 +1051,8 @@ public class HiveConf extends Configuration {
         "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a 
chain of buffers to\n" +
         "store data. This is one buffer size. HT may be slightly faster if 
this is larger, but for small\n" +
         "joins unnecessary memory will be allocated and then trimmed."),
+    HIVEHYBRIDGRACEHASHJOINBLOOMFILTER("hive.mapjoin.hybridgrace.bloomfilter", 
true, "Whether to " +
+        "use BloomFilter in Hybrid grace hash join to minimize unnecessary 
spilling."),
 
     HIVESMBJOINCACHEROWS("hive.smbjoin.cache.rows", 10000,
         "How many rows with the same key value should be cached in memory per 
smb joined table."),

http://git-wip-us.apache.org/repos/asf/hive/blob/9cb54550/common/src/test/org/apache/hive/common/util/TestBloomFilter.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/TestBloomFilter.java 
b/common/src/test/org/apache/hive/common/util/TestBloomFilter.java
index 7c2a941..63c7050 100644
--- a/common/src/test/org/apache/hive/common/util/TestBloomFilter.java
+++ b/common/src/test/org/apache/hive/common/util/TestBloomFilter.java
@@ -70,6 +70,12 @@ public class TestBloomFilter {
     assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03));
     assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
     assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+    assertEquals(1870567268, BloomFilter.optimalNumOfBits(300000000, 0.05));
+    assertEquals(1437758756, BloomFilter.optimalNumOfBits(300000000, 0.1));
+    assertEquals(432808512, BloomFilter.optimalNumOfBits(300000000, 0.5));
+    assertEquals(1393332198, BloomFilter.optimalNumOfBits(3000000000L, 0.8));
+    assertEquals(657882327, BloomFilter.optimalNumOfBits(3000000000L, 0.9));
+    assertEquals(0, BloomFilter.optimalNumOfBits(3000000000L, 1));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/9cb54550/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index e4a2b35..573dc08 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -111,7 +111,8 @@ public class HybridHashTableContainer
    * This is a cheap exit option to prevent spilling the big-table in such a
    * scenario.
    */
-  private transient final BloomFilter bloom1;
+  private transient BloomFilter bloom1 = null;
+  private final int BLOOM_FILTER_MAX_SIZE = 300000000;
 
   private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
 
@@ -276,14 +277,15 @@ public class HybridHashTableContainer
         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
         HiveConf.getIntVar(hconf, 
HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS),
         HiveConf.getFloatVar(hconf, 
HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT),
+        HiveConf.getBoolVar(hconf, 
HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINBLOOMFILTER),
         estimatedTableSize, keyCount, memoryAvailable, nwayConf,
         HiveUtils.getLocalDirList(hconf));
   }
 
   private HybridHashTableContainer(float keyCountAdj, int threshold, float 
loadFactor,
       int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float 
probePercent,
-      long estimatedTableSize, long keyCount, long memoryAvailable, 
HybridHashTableConf nwayConf,
-      String spillLocalDirs)
+      boolean useBloomFilter, long estimatedTableSize, long keyCount, long 
memoryAvailable,
+      HybridHashTableConf nwayConf, String spillLocalDirs)
       throws SerDeException, IOException {
     directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter();
 
@@ -331,18 +333,29 @@ public class HybridHashTableContainer
     // We also want to limit the size of writeBuffer, because we normally have 
16 partitions, that
     // makes spilling prediction (isMemoryFull) to be too defensive which 
results in unnecessary spilling
     writeBufferSize = writeBufferSize < minWbSize ? minWbSize : 
Math.min(maxWbSize / numPartitions, writeBufferSize);
+    LOG.info("Write buffer size: " + writeBufferSize);
+    memoryUsed = 0;
 
-    this.bloom1 = new BloomFilter(newKeyCount);
-
-    if (LOG.isInfoEnabled()) {
+    if (useBloomFilter) {
+      if (newKeyCount <= BLOOM_FILTER_MAX_SIZE) {
+        this.bloom1 = new BloomFilter(newKeyCount);
+      } else {
+        // To avoid having a huge BloomFilter we need to scale up False 
Positive Probability
+        double fpp = calcFPP(newKeyCount);
+        assert fpp < 1 : "Too many keys! BloomFilter False Positive 
Probability is 1!";
+        if (fpp >= 0.5) {
+          LOG.warn("BloomFilter FPP is greater than 0.5!");
+        }
+        LOG.info("BloomFilter is using FPP: " + fpp);
+        this.bloom1 = new BloomFilter(newKeyCount, fpp);
+      }
       LOG.info(String.format("Using a bloom-1 filter %d keys of size %d bytes",
-          newKeyCount, bloom1.sizeInBytes()));
-      LOG.info("Write buffer size: " + writeBufferSize);
+        newKeyCount, bloom1.sizeInBytes()));
+      memoryUsed = bloom1.sizeInBytes();
     }
 
     hashPartitions = new HashPartition[numPartitions];
     int numPartitionsSpilledOnCreation = 0;
-    memoryUsed = bloom1.sizeInBytes();
     int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / 
numPartitions);
     // maxCapacity should be calculated based on a percentage of 
memoryThreshold, which is to divide
     // row size using long size
@@ -406,6 +419,22 @@ public class HybridHashTableContainer
     }
   }
 
+  /**
+   * Calculate the proper False Positive Probability so that the BloomFilter 
won't grow too big
+   * @param keyCount number of keys
+   * @return FPP
+   */
+  private double calcFPP(int keyCount) {
+    int n = keyCount;
+    double p = 0.05;
+
+    // Calculation below is consistent with BloomFilter.optimalNumOfBits().
+    // Also, we are capping the BloomFilter size below 100 MB (800000000/8)
+    while ((-n * Math.log(p) / (Math.log(2) * Math.log(2))) > 800000000) {
+      p += 0.05;
+    }
+    return p;
+  }
 
   public MapJoinBytesTableContainer.KeyValueHelper getWriteHelper() {
     return writeHelper;
@@ -424,7 +453,7 @@ public class HybridHashTableContainer
    * @return current memory usage
    */
   private long refreshMemoryUsed() {
-    long memUsed = bloom1.sizeInBytes();
+    long memUsed = bloom1 != null ? bloom1.sizeInBytes() : 0;
     for (HashPartition hp : hashPartitions) {
       if (hp.hashMap != null) {
         memUsed += hp.hashMap.memorySize();
@@ -477,7 +506,9 @@ public class HybridHashTableContainer
     int partitionId = keyHash & (hashPartitions.length - 1);
     HashPartition hashPartition = hashPartitions[partitionId];
 
-    bloom1.addLong(keyHash);
+    if (bloom1 != null) {
+      bloom1.addLong(keyHash);
+    }
 
     if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // 
destination on disk
       putToSidefile = true;
@@ -909,7 +940,7 @@ public class HybridHashTableContainer
     public JoinUtil.JoinResult setFromOutput(Output output) throws 
HiveException {
       int keyHash = HashCodeUtil.murmurHash(output.getData(), 0, 
output.getLength());
 
-      if (!bloom1.testLong(keyHash)) {
+      if (bloom1 != null && !bloom1.testLong(keyHash)) {
         /*
          * if the keyHash is missing in the bloom filter, then the value cannot
          * exist in any of the spilled partition - return NOMATCH
@@ -1063,7 +1094,7 @@ public class HybridHashTableContainer
       int keyHash = HashCodeUtil.murmurHash(bytes, offset, length);
       partitionId = keyHash & (hashPartitions.length - 1);
 
-      if (!bloom1.testLong(keyHash)) {
+      if (bloom1 != null && !bloom1.testLong(keyHash)) {
         /*
          * if the keyHash is missing in the bloom filter, then the value 
cannot exist in any of the
          * spilled partition - return NOMATCH

Reply via email to