Author: hashutosh
Date: Thu Nov 21 03:15:30 2013
New Revision: 1544017

URL: http://svn.apache.org/r1544017
Log:
HIVE-5692 : Make VectorGroupByOperator parameters configurable (Remus Rusanu 
via Ashutosh Chauhan)

Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu 
Nov 21 03:15:30 2013
@@ -841,6 +841,10 @@ public class HiveConf extends Configurat
 
     //Vectorization enabled
     HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false),
+    
HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval",
 100000),
+    
HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 
1000000),
+    
HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT("hive.vectorized.groupby.flush.percent",
 (float) 0.1),
+    
 
     HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true),
 

Modified: hive/trunk/conf/hive-default.xml.template
URL: 
http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Thu Nov 21 03:15:30 2013
@@ -2056,6 +2056,24 @@
 </property>
 
 <property>
+  <name>hive.vectorized.groupby.maxentries</name>
+  <value>1000000</value>
+  <description>Max number of entries in the vector group by aggregation 
hashtables. Exceeding this will trigger a flush irrelevant of memory pressure 
condition.</description>
+</property>
+
+<property>
+  <name>hive.vectorized.groupby.checkinterval</name>
+  <value>100000</value>
+  <description>Number of entries added to the group by aggregation hash before 
a reocmputation of average entry size is performed.</description>
+</property>
+
+<property>
+  <name>hive.vectorized.groupby.flush.percent</name>
+  <value>0.1</value>
+  <description>Percent of entries in the group by aggregation hash flushed 
when the memory treshold is exceeded.</description>
+</property>
+
+<property>
   <name>hive.compute.query.using.stats</name>
   <value>false</value>
   <description>
@@ -2065,7 +2083,6 @@
   </description>
 </property>
 
-
 <property>
   <name>hive.metastore.schema.verification</name>
   <value>false</value>

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
 Thu Nov 21 03:15:30 2013
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
@@ -100,16 +101,22 @@ public class VectorGroupByOperator exten
    * Sum of batch size processed (ie. rows).
    */
   private transient long sumBatchSize;
+  
+  /**
+   * Max number of entries in the vector group by aggregation hashtables. 
+   * Exceeding this will trigger a flush irrelevant of memory pressure 
condition.
+   */
+  private transient int maxHtEntries = 1000000;
 
   /**
    * The number of new entries that must be added to the hashtable before a 
memory size check.
    */
-  private static final int FLUSH_CHECK_THRESHOLD = 10000;
+  private transient int checkInterval = 10000;
 
   /**
    * Percent of entries to flush when memory threshold exceeded.
    */
-  private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f;
+  private transient float percentEntriesToFlush = 0.1f;
 
   /**
    * The global key-aggregation hash map.
@@ -139,6 +146,16 @@ public class VectorGroupByOperator exten
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
+    
+    // hconf is null in unit testing
+    if (null != hconf) {
+      this.percentEntriesToFlush = HiveConf.getFloatVar(hconf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT);
+      this.checkInterval = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL);
+      this.maxHtEntries = HiveConf.getIntVar(hconf,
+          HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES);
+    }
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -226,8 +243,21 @@ public class VectorGroupByOperator exten
     processAggregators(batch);
 
     //Flush if memory limits were reached
-    if (shouldFlush(batch)) {
+    // We keep flushing until the memory is under threshold 
+    int preFlushEntriesCount = numEntriesHashTable;
+    while (shouldFlush(batch)) {
       flush(false);
+      
+      //Validate that some progress is being made
+      if (!(numEntriesHashTable < preFlushEntriesCount)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Flush did not progress: %d entries before, 
%d entries after",
+              preFlushEntriesCount,
+              numEntriesHashTable));
+        }
+        break;
+      }
+      preFlushEntriesCount = numEntriesHashTable;
     }
 
     if (sumBatchSize == 0 && 0 != batch.size) {
@@ -247,7 +277,7 @@ public class VectorGroupByOperator exten
   private void flush(boolean all) throws HiveException {
 
     int entriesToFlush = all ? numEntriesHashTable :
-      (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH);
+      (int)(numEntriesHashTable * this.percentEntriesToFlush);
     int entriesFlushed = 0;
 
     if (LOG.isDebugEnabled()) {
@@ -309,14 +339,18 @@ public class VectorGroupByOperator exten
    * Returns true if the memory threshold for the hash table was reached.
    */
   private boolean shouldFlush(VectorizedRowBatch batch) {
-    if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD ||
-        batch.size == 0) {
+    if (batch.size == 0) {
       return false;
     }
-    // Were going to update the average variable row size by sampling the 
current batch
-    updateAvgVariableSize(batch);
-    numEntriesSinceCheck = 0;
-    return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > 
maxHashTblMemory;
+    //numEntriesSinceCheck is the number of entries added to the hash table
+    // since the last time we checked the average variable size
+    if (numEntriesSinceCheck >= this.checkInterval) {
+      // Were going to update the average variable row size by sampling the 
current batch
+      updateAvgVariableSize(batch);
+      numEntriesSinceCheck = 0;
+    }
+    return numEntriesHashTable > this.maxHtEntries ||
+        numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > 
maxHashTblMemory;
   }
 
   /**

Modified: 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
--- 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
 (original)
+++ 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
 Thu Nov 21 03:15:30 2013
@@ -23,12 +23,15 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
 import java.lang.reflect.Constructor;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -155,6 +158,88 @@ public class TestVectorGroupByOperator {
 
     return desc;
   }
+  
+  long outputRowCount = 0;
+  
+  @Test
+  public void testMemoryPressureFlush() throws HiveException {
+
+    Map<String, Integer> mapColumnNames = new HashMap<String, Integer>();
+    mapColumnNames.put("Key", 0);
+    mapColumnNames.put("Value", 1);
+    VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2);
+
+    GroupByDesc desc = buildKeyGroupByDesc (ctx, "max", 
+        "Value", TypeInfoFactory.longTypeInfo, 
+        "Key", TypeInfoFactory.longTypeInfo);
+    
+    // Set the memory treshold so that we get 100Kb before we need to flush.
+    MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+    long maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+    
+    float treshold = 100.0f*1024.0f/maxMemory;
+    desc.setMemoryThreshold(treshold);
+
+    VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+    
+    FakeCaptureOutputOperator out = 
FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+    vgo.initialize(null, null);
+    
+    this.outputRowCount = 0;
+    out.setOutputInspector(new FakeCaptureOutputOperator.OutputInspector() {
+      @Override
+      public void inspectRow(Object row, int tag) throws HiveException {
+        ++outputRowCount;
+      }
+    });
+          
+    Iterable<Object> it = new Iterable<Object>() {
+      @Override
+      public Iterator<Object> iterator() {
+        return new Iterator<Object> () {
+          long value = 0;
+
+          @Override
+          public boolean hasNext() {
+            return true;
+          }
+
+          @Override
+          public Object next() {
+            return ++value;
+          }
+
+          @Override
+          public void remove() {
+          }
+        };
+      }
+    };
+    
+    FakeVectorRowBatchFromObjectIterables data = new 
FakeVectorRowBatchFromObjectIterables(
+        100,
+        new String[] {"long", "long"},
+        it,
+        it);
+
+    // The 'it' data source will produce data w/o ever ending
+    // We want to see that memory pressure kicks in and some 
+    // entries in the VGBY are flushed.
+    long countRowsProduced = 0;
+    for (VectorizedRowBatch unit: data) {
+      countRowsProduced += 100;
+      vgo.process(unit,  0);
+      if (0 < outputRowCount) {
+        break;
+      }
+      // Set an upper bound how much we're willing to push before it should 
flush
+      // we've set the memory treshold at 100kb, each key is distinct
+      // It should not go beyond 100k/16 (key+data)
+      assertTrue(countRowsProduced < 100*1024/16);
+    }
+    
+    assertTrue(0 < outputRowCount);
+  }
 
   @Test
   public void testMultiKeyIntStringInt() throws HiveException {


Reply via email to