DRILL-1022: Increase default min hash table size and allow setting min/max size 
for hash table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ff39fb83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ff39fb83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ff39fb83

Branch: refs/heads/master
Commit: ff39fb8383e038aadbf4810a6b4ad5f22d25a181
Parents: 4243f54
Author: Aman Sinha <[email protected]>
Authored: Tue Jun 17 22:41:14 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Jun 18 21:50:09 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/exec/ExecConstants.java  |  9 +++++++++
 .../drill/exec/physical/config/HashAggregate.java  | 17 -----------------
 .../exec/physical/impl/aggregate/HashAggBatch.java | 10 +++++++++-
 .../physical/impl/aggregate/HashAggTemplate.java   |  5 +++--
 .../physical/impl/aggregate/HashAggregator.java    |  3 ++-
 .../drill/exec/physical/impl/common/HashTable.java |  2 +-
 .../exec/physical/impl/join/HashJoinBatch.java     |  5 ++++-
 .../exec/server/options/SystemOptionManager.java   |  5 +++--
 .../exec/physical/impl/join/TestHashJoin.java      | 13 +++++++++++--
 9 files changed, 42 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 6673c4c..7681dd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec;
 
+import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.server.options.OptionValidator;
 import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
 import org.apache.drill.exec.server.options.TypeValidators.DoubleValidator;
@@ -86,6 +87,14 @@ public interface ExecConstants {
 
   public static final String SLICE_TARGET = "planner.slice_target";
   public static final OptionValidator SLICE_TARGET_OPTION = new 
PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE, 1000000);
+  
+  /**
+   * HashTable runtime settings
+   */
+  public static final String MIN_HASH_TABLE_SIZE_KEY = 
"exec.min_hash_table_size";
+  public static final OptionValidator MIN_HASH_TABLE_SIZE = new 
PositiveLongValidator(MIN_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, 
HashTable.DEFAULT_INITIAL_CAPACITY);
+  public static final String MAX_HASH_TABLE_SIZE_KEY = 
"exec.max_hash_table_size";
+  public static final OptionValidator MAX_HASH_TABLE_SIZE = new 
PositiveLongValidator(MAX_HASH_TABLE_SIZE_KEY, HashTable.MAXIMUM_CAPACITY, 
HashTable.MAXIMUM_CAPACITY);
 
   /**
    * Limits the maximum level of parallelization to this factor time the 
number of Drillbits

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index e4ce5f8..694570c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -21,8 +21,6 @@ import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.impl.common.HashTable;
-import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -39,23 +37,12 @@ public class HashAggregate extends AbstractSingle {
 
   private final float cardinality;
 
-  // configuration parameters for the hash table
-  private final HashTableConfig htConfig;
-
   @JsonCreator
   public HashAggregate(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("keys") NamedExpression[] groupByExprs, @JsonProperty("exprs") 
NamedExpression[] aggrExprs, @JsonProperty("cardinality") float cardinality) {
     super(child);
     this.groupByExprs = groupByExprs;
     this.aggrExprs = aggrExprs;
     this.cardinality = cardinality;
-
-    int initial_capacity = cardinality > HashTable.DEFAULT_INITIAL_CAPACITY ?
-      (int) cardinality : HashTable.DEFAULT_INITIAL_CAPACITY;
-
-    this.htConfig = new HashTableConfig(initial_capacity,
-                                        HashTable.DEFAULT_LOAD_FACTOR,
-                                        groupByExprs,
-                                        null /* no probe exprs */) ;
   }
 
   public NamedExpression[] getGroupByExprs() {
@@ -70,10 +57,6 @@ public class HashAggregate extends AbstractSingle {
     return cardinality;
   }
 
-  public HashTableConfig getHtConfig() {
-    return htConfig;
-  }
-
   @Override
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E{
     return physicalVisitor.visitHashAggregate(this, value);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index dd58562..6adc304 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -48,6 +49,8 @@ import 
org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
+import org.apache.drill.exec.physical.impl.common.HashTable;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 
 import com.google.common.collect.Lists;
 import com.sun.codemodel.JExpr;
@@ -220,7 +223,12 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
     container.buildSchema(SelectionVectorMode.NONE);
     HashAggregator agg = context.getImplementationClass(top);
 
-    agg.setup(popConfig, context, this.stats,
+    HashTableConfig htConfig = new 
HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
+                                                   
HashTable.DEFAULT_LOAD_FACTOR,
+                                                   popConfig.getGroupByExprs(),
+                                                   null /* no probe exprs */) ;
+    
+    agg.setup(popConfig, htConfig, context, this.stats,
               oContext.getAllocator(), incoming, this,
               aggrExprs,
               cgInner.getWorkspaceTypes(),

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 72095b7..5069a2d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -171,7 +171,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
 
   @Override
-  public void setup(HashAggregate hashAggrConfig, FragmentContext context, 
+  public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, 
+                    FragmentContext context, 
                     OperatorStats stats,
                     BufferAllocator allocator, RecordBatch incoming, 
HashAggBatch outgoing,
                     LogicalExpression[] valueExprs,
@@ -219,7 +220,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       }
     }
 
-    ChainedHashTable ht = new ChainedHashTable(hashAggrConfig.getHtConfig(), 
context, allocator, incoming, null /* no incoming probe */, outgoing) ;
+    ChainedHashTable ht = new ChainedHashTable(htConfig, context, allocator, 
incoming, null /* no incoming probe */, outgoing) ;
     this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
 
     batchHolders = new ArrayList<BatchHolder>();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index d14880c..b94f299 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -42,7 +43,7 @@ public interface HashAggregator {
     RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
          }
   
-  public abstract void setup(HashAggregate hashAggrConfig, FragmentContext 
context, 
+  public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig 
htConfig, FragmentContext context, 
                              OperatorStats stats, BufferAllocator allocator, 
RecordBatch incoming,
                              HashAggBatch outgoing, LogicalExpression[] 
valueExprs, 
                              List<TypedFieldId> valueFieldIds,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 429ec63..9f5d4f8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -30,7 +30,7 @@ public interface HashTable {
   public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class);
 
   /** The initial default capacity of the hash table (in terms of number of 
buckets). */
-  static final public int DEFAULT_INITIAL_CAPACITY = 1 << 8; 
+  static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16; 
 
   /** The maximum capacity of the hash table (in terms of number of buckets). 
*/
   static final public int MAXIMUM_CAPACITY = 1 << 30; 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index c43b99a..11368e3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -257,7 +258,9 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
           }
         }
 
-        HashTableConfig htConfig = new 
HashTableConfig(HashTable.DEFAULT_INITIAL_CAPACITY, 
HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
+        HashTableConfig htConfig = 
+            new 
HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
 
+            HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
         // Create the chained hash table
         ChainedHashTable ht  = new ChainedHashTable(htConfig, context, 
oContext.getAllocator(), this.right, this.left, null);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8503197..a42640f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -64,8 +64,9 @@ public class SystemOptionManager implements OptionManager{
       ExecConstants.LARGE_QUEUE_SIZE,
       ExecConstants.QUEUE_THRESHOLD_SIZE,
       ExecConstants.QUEUE_TIMEOUT,
-      ExecConstants.SMALL_QUEUE_SIZE
-
+      ExecConstants.SMALL_QUEUE_SIZE, 
+      ExecConstants.MIN_HASH_TABLE_SIZE,
+      ExecConstants.MAX_HASH_TABLE_SIZE
   };
 
   public final PStoreConfig<OptionValue> config;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ff39fb83/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
index d4a86ca..e24426e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
@@ -29,6 +29,8 @@ import mockit.NonStrictExpectations;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.local.LocalCache;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -42,7 +44,6 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
@@ -50,7 +51,10 @@ import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Rule;
 import org.junit.Test;
@@ -69,11 +73,16 @@ public class TestHashJoin extends PopUnitTestBase{
     DrillConfig c = DrillConfig.create();
 
     private void testHJMockScanCommon(final DrillbitContext bitContext, 
UserServer.UserClientConnection connection, String physicalPlan, int 
expectedRows) throws Throwable {
+      final LocalPStoreProvider provider = new LocalPStoreProvider(c);
+      provider.start();
+      final SystemOptionManager opt = new SystemOptionManager(c, provider);
+      opt.init();
         new NonStrictExpectations(){{
             bitContext.getMetrics(); result = new MetricRegistry();
             bitContext.getAllocator(); result = new TopLevelAllocator();
             bitContext.getOperatorCreatorRegistry(); result = new 
OperatorCreatorRegistry(c);
             bitContext.getConfig(); result = c;
+            bitContext.getOptionManager(); result = opt;
         }};
 
         PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());

Reply via email to