Repository: hive
Updated Branches:
  refs/heads/master a88050bd9 -> cbebb4d78


HIVE-12837 : Better memory estimation/allocation for hybrid grace hash join 
during hash table loading (Wei Zheng, reviewed by Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbebb4d7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbebb4d7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbebb4d7

Branch: refs/heads/master
Commit: cbebb4d78064a9098e4145a0f7532f08885c9b27
Parents: a88050b
Author: Wei Zheng <w...@apache.org>
Authored: Wed May 4 23:09:08 2016 -0700
Committer: Wei Zheng <w...@apache.org>
Committed: Wed May 4 23:09:08 2016 -0700

----------------------------------------------------------------------
 .../persistence/HybridHashTableContainer.java   | 60 +++++++++++++++-----
 .../ql/exec/persistence/KeyValueContainer.java  |  4 ++
 2 files changed, 51 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index f5da5a4..5552dfb 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -90,6 +90,7 @@ public class HybridHashTableContainer
   private boolean lastPartitionInMem;           // only one (last one) 
partition is left in memory
   private final int memoryCheckFrequency;       // how often (# of rows apart) 
to check if memory is full
   private final HybridHashTableConf nwayConf;         // configuration for 
n-way join
+  private int writeBufferSize;                  // write buffer size for 
BytesBytesMultiHashMap
 
   /** The OI used to deserialize values. We never deserialize keys. */
   private LazyBinaryStructObjectInspector internalValueOi;
@@ -294,7 +295,6 @@ public class HybridHashTableContainer
     this.spillLocalDirs = spillLocalDirs;
 
     this.nwayConf = nwayConf;
-    int writeBufferSize;
     int numPartitions;
     if (nwayConf == null) { // binary join
       numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, 
minNumParts, minWbSize);
@@ -327,7 +327,9 @@ public class HybridHashTableContainer
         writeBufferSize : Integer.highestOneBit(writeBufferSize);
 
     // Cap WriteBufferSize to avoid large preallocations
-    writeBufferSize = writeBufferSize < minWbSize ? minWbSize : 
Math.min(maxWbSize, writeBufferSize);
+    // We also want to limit the size of writeBuffer, because we normally have 
16 partitions, that
+    // makes spilling prediction (isMemoryFull) to be too defensive which 
results in unnecessary spilling
+    writeBufferSize = writeBufferSize < minWbSize ? minWbSize : 
Math.min(maxWbSize / numPartitions, writeBufferSize);
 
     this.bloom1 = new BloomFilter(newKeyCount);
 
@@ -417,6 +419,11 @@ public class HybridHashTableContainer
     for (HashPartition hp : hashPartitions) {
       if (hp.hashMap != null) {
         memUsed += hp.hashMap.memorySize();
+      } else {
+        // also include the still-in-memory sidefile, before it has been 
truely spilled
+        if (hp.sidefileKVContainer != null) {
+          memUsed += hp.sidefileKVContainer.numRowsInReadBuffer() * 
tableRowSize;
+        }
       }
     }
     return memoryUsed = memUsed;
@@ -454,6 +461,8 @@ public class HybridHashTableContainer
   private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper,
           Writable currentKey, Writable currentValue) throws SerDeException, 
IOException {
 
+    boolean putToSidefile = false; // by default we put row into partition in 
memory
+
     // Next, put row into corresponding hash partition
     int keyHash = keyValueHelper.getHashFromKey();
     int partitionId = keyHash & (hashPartitions.length - 1);
@@ -461,15 +470,13 @@ public class HybridHashTableContainer
 
     bloom1.addLong(keyHash);
 
-    if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) {
-      KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer();
-      kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue);
-    } else {
-      hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along 
hashcode to avoid recalculation
-      totalInMemRowCount++;
-
-      if ((totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0 &&  // 
check periodically
-          !lastPartitionInMem) { // If this is the only partition in memory, 
proceed without check
+    if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // 
destination on disk
+      putToSidefile = true;
+    } else {  // destination in memory
+      if (!lastPartitionInMem &&        // If this is the only partition in 
memory, proceed without check
+          (hashPartition.size() == 0 || // Destination partition being empty 
indicates a write buffer
+                                        // will be allocated, thus need to 
check if memory is full
+           (totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0)) {  // 
check periodically
         if (isMemoryFull()) {
           if ((numPartitionsSpilled == hashPartitions.length - 1) ) {
             LOG.warn("This LAST partition in memory won't be spilled!");
@@ -479,9 +486,16 @@ public class HybridHashTableContainer
               int biggest = biggestPartition();
               spillPartition(biggest);
               this.setSpill(true);
+              if (partitionId == biggest) { // destination hash partition has 
just be spilled
+                putToSidefile = true;
+              }
             } else {                // n-way join
               LOG.info("N-way spilling: spill tail partition from previously 
loaded small tables");
+              int biggest = nwayConf.getNextSpillPartition();
               memoryThreshold += nwayConf.spill();
+              if (biggest != 0 && partitionId == biggest) { // destination 
hash partition has just be spilled
+                putToSidefile = true;
+              }
               LOG.info("Memory threshold has been increased to: " + 
memoryThreshold);
             }
             numPartitionsSpilled++;
@@ -490,6 +504,15 @@ public class HybridHashTableContainer
       }
     }
 
+    // Now we know where to put row
+    if (putToSidefile) {
+      KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer();
+      kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue);
+    } else {
+      hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along 
hashcode to avoid recalculation
+      totalInMemRowCount++;
+    }
+
     return null; // there's no key to return
   }
 
@@ -513,11 +536,21 @@ public class HybridHashTableContainer
   }
 
   /**
-   * Check if the memory threshold is reached
+   * Check if the memory threshold is about to be reached.
+   * Since all the write buffer will be lazily allocated in 
BytesBytesMultiHashMap, we need to
+   * consider those as well.
    * @return true if memory is full, false if not
    */
   private boolean isMemoryFull() {
-    return refreshMemoryUsed() >= memoryThreshold;
+    int numPartitionsInMem = 0;
+
+    for (HashPartition hp : hashPartitions) {
+      if (!hp.isHashMapOnDisk()) {
+        numPartitionsInMem++;
+      }
+    }
+
+    return refreshMemoryUsed() + writeBufferSize * numPartitionsInMem >= 
memoryThreshold;
   }
 
   /**
@@ -561,6 +594,7 @@ public class HybridHashTableContainer
         new com.esotericsoftware.kryo.io.Output(outputStream);
     Kryo kryo = SerializationUtilities.borrowKryo();
     try {
+      LOG.info("Trying to spill hash partition " + partitionId + " ...");
       kryo.writeObject(output, partition.hashMap);  // use Kryo to serialize 
hashmap
       output.close();
       outputStream.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
index e2b22d3..72faf8b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
@@ -215,6 +215,10 @@ public class KeyValueContainer {
     return row;
   }
 
+  public int numRowsInReadBuffer() {
+    return rowsInReadBuffer;
+  }
+
   public int size() {
     return rowsInReadBuffer + rowsOnDisk;
   }

Reply via email to