Author: hashutosh
Date: Thu Nov 21 03:15:30 2013
New Revision: 1544017
URL: http://svn.apache.org/r1544017
Log:
HIVE-5692 : Make VectorGroupByOperator parameters configurable (Remus Rusanu
via Ashutosh Chauhan)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL:
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu
Nov 21 03:15:30 2013
@@ -841,6 +841,10 @@ public class HiveConf extends Configurat
//Vectorization enabled
HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false),
+
HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval",
100000),
+
HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries",
1000000),
+
HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT("hive.vectorized.groupby.flush.percent",
(float) 0.1),
+
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true),
Modified: hive/trunk/conf/hive-default.xml.template
URL:
http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Thu Nov 21 03:15:30 2013
@@ -2056,6 +2056,24 @@
</property>
<property>
+ <name>hive.vectorized.groupby.maxentries</name>
+ <value>1000000</value>
+ <description>Max number of entries in the vector group by aggregation
hashtables. Exceeding this will trigger a flush irrelevant of memory pressure
condition.</description>
+</property>
+
+<property>
+ <name>hive.vectorized.groupby.checkinterval</name>
+ <value>100000</value>
+ <description>Number of entries added to the group by aggregation hash before
a reocmputation of average entry size is performed.</description>
+</property>
+
+<property>
+ <name>hive.vectorized.groupby.flush.percent</name>
+ <value>0.1</value>
+ <description>Percent of entries in the group by aggregation hash flushed
when the memory treshold is exceeded.</description>
+</property>
+
+<property>
<name>hive.compute.query.using.stats</name>
<value>false</value>
<description>
@@ -2065,7 +2083,6 @@
</description>
</property>
-
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
Thu Nov 21 03:15:30 2013
@@ -30,6 +30,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
@@ -100,16 +101,22 @@ public class VectorGroupByOperator exten
* Sum of batch size processed (ie. rows).
*/
private transient long sumBatchSize;
+
+ /**
+ * Max number of entries in the vector group by aggregation hashtables.
+ * Exceeding this will trigger a flush irrelevant of memory pressure
condition.
+ */
+ private transient int maxHtEntries = 1000000;
/**
* The number of new entries that must be added to the hashtable before a
memory size check.
*/
- private static final int FLUSH_CHECK_THRESHOLD = 10000;
+ private transient int checkInterval = 10000;
/**
* Percent of entries to flush when memory threshold exceeded.
*/
- private static final float PERCENT_ENTRIES_TO_FLUSH = 0.1f;
+ private transient float percentEntriesToFlush = 0.1f;
/**
* The global key-aggregation hash map.
@@ -139,6 +146,16 @@ public class VectorGroupByOperator exten
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
+
+ // hconf is null in unit testing
+ if (null != hconf) {
+ this.percentEntriesToFlush = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_FLUSH_PERCENT);
+ this.checkInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL);
+ this.maxHtEntries = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_GROUPBY_MAXENTRIES);
+ }
List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
@@ -226,8 +243,21 @@ public class VectorGroupByOperator exten
processAggregators(batch);
//Flush if memory limits were reached
- if (shouldFlush(batch)) {
+ // We keep flushing until the memory is under threshold
+ int preFlushEntriesCount = numEntriesHashTable;
+ while (shouldFlush(batch)) {
flush(false);
+
+ //Validate that some progress is being made
+ if (!(numEntriesHashTable < preFlushEntriesCount)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Flush did not progress: %d entries before,
%d entries after",
+ preFlushEntriesCount,
+ numEntriesHashTable));
+ }
+ break;
+ }
+ preFlushEntriesCount = numEntriesHashTable;
}
if (sumBatchSize == 0 && 0 != batch.size) {
@@ -247,7 +277,7 @@ public class VectorGroupByOperator exten
private void flush(boolean all) throws HiveException {
int entriesToFlush = all ? numEntriesHashTable :
- (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH);
+ (int)(numEntriesHashTable * this.percentEntriesToFlush);
int entriesFlushed = 0;
if (LOG.isDebugEnabled()) {
@@ -309,14 +339,18 @@ public class VectorGroupByOperator exten
* Returns true if the memory threshold for the hash table was reached.
*/
private boolean shouldFlush(VectorizedRowBatch batch) {
- if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD ||
- batch.size == 0) {
+ if (batch.size == 0) {
return false;
}
- // Were going to update the average variable row size by sampling the
current batch
- updateAvgVariableSize(batch);
- numEntriesSinceCheck = 0;
- return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) >
maxHashTblMemory;
+ //numEntriesSinceCheck is the number of entries added to the hash table
+ // since the last time we checked the average variable size
+ if (numEntriesSinceCheck >= this.checkInterval) {
+ // Were going to update the average variable row size by sampling the
current batch
+ updateAvgVariableSize(batch);
+ numEntriesSinceCheck = 0;
+ }
+ return numEntriesHashTable > this.maxHtEntries ||
+ numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) >
maxHashTblMemory;
}
/**
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1544017&r1=1544016&r2=1544017&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
Thu Nov 21 03:15:30 2013
@@ -23,12 +23,15 @@ import static org.junit.Assert.assertFal
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
import java.lang.reflect.Constructor;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -155,6 +158,88 @@ public class TestVectorGroupByOperator {
return desc;
}
+
+ long outputRowCount = 0;
+
+ @Test
+ public void testMemoryPressureFlush() throws HiveException {
+
+ Map<String, Integer> mapColumnNames = new HashMap<String, Integer>();
+ mapColumnNames.put("Key", 0);
+ mapColumnNames.put("Value", 1);
+ VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2);
+
+ GroupByDesc desc = buildKeyGroupByDesc (ctx, "max",
+ "Value", TypeInfoFactory.longTypeInfo,
+ "Key", TypeInfoFactory.longTypeInfo);
+
+ // Set the memory treshold so that we get 100Kb before we need to flush.
+ MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ long maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+
+ float treshold = 100.0f*1024.0f/maxMemory;
+ desc.setMemoryThreshold(treshold);
+
+ VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+
+ FakeCaptureOutputOperator out =
FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+ vgo.initialize(null, null);
+
+ this.outputRowCount = 0;
+ out.setOutputInspector(new FakeCaptureOutputOperator.OutputInspector() {
+ @Override
+ public void inspectRow(Object row, int tag) throws HiveException {
+ ++outputRowCount;
+ }
+ });
+
+ Iterable<Object> it = new Iterable<Object>() {
+ @Override
+ public Iterator<Object> iterator() {
+ return new Iterator<Object> () {
+ long value = 0;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public Object next() {
+ return ++value;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
+
+ FakeVectorRowBatchFromObjectIterables data = new
FakeVectorRowBatchFromObjectIterables(
+ 100,
+ new String[] {"long", "long"},
+ it,
+ it);
+
+ // The 'it' data source will produce data w/o ever ending
+ // We want to see that memory pressure kicks in and some
+ // entries in the VGBY are flushed.
+ long countRowsProduced = 0;
+ for (VectorizedRowBatch unit: data) {
+ countRowsProduced += 100;
+ vgo.process(unit, 0);
+ if (0 < outputRowCount) {
+ break;
+ }
+ // Set an upper bound how much we're willing to push before it should
flush
+ // we've set the memory treshold at 100kb, each key is distinct
+ // It should not go beyond 100k/16 (key+data)
+ assertTrue(countRowsProduced < 100*1024/16);
+ }
+
+ assertTrue(0 < outputRowCount);
+ }
@Test
public void testMultiKeyIntStringInt() throws HiveException {