DRILL-1111: 1. Create new batch holder for hash table if an insertion fails due 
to space constraints in existing batch holder. 2. Use allocateNew() for hash 
aggr and hash table allocations. 3. Create new output batch if output values 
fails the first time due to space constraints. 4. Use splitAndTransfer to 
transfer the keys from hash table's container to the output batch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/dcc94a39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/dcc94a39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/dcc94a39

Branch: refs/heads/master
Commit: dcc94a3980498761f81d8e87cc04097ebcfdb3f7
Parents: a59debb
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Wed Jun 25 09:00:16 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Mon Jul 7 14:50:05 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/aggregate/HashAggBatch.java   |   9 +-
 .../impl/aggregate/HashAggTemplate.java         | 152 ++++++++++---------
 .../physical/impl/aggregate/HashAggregator.java |   3 +-
 .../physical/impl/common/ChainedHashTable.java  |   2 +-
 .../exec/physical/impl/common/HashTable.java    |   5 +-
 .../physical/impl/common/HashTableTemplate.java |  57 +++++--
 .../exec/physical/impl/join/HashJoinBatch.java  |   2 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |   2 +-
 8 files changed, 139 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 3609c02..b30a357 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -193,8 +193,6 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
     ClassGenerator<HashAggregator> cgInner = 
cg.getInnerGenerator("BatchHolder");
 
     container.clear();
-    List<VectorAllocator> keyAllocators = Lists.newArrayList();
-    List<VectorAllocator> valueAllocators = Lists.newArrayList();
 
     int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? 
popConfig.getGroupByExprs().length : 0;
     int numAggrExprs = (popConfig.getAggrExprs() != null) ? 
popConfig.getAggrExprs().length : 0;
@@ -213,7 +211,6 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
 
       final MaterializedField outputField = 
MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vv = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
-      keyAllocators.add(VectorAllocator.getAllocator(vv, 200));
 
       // add this group-by vector to the output container
       groupByOutFieldIds[i] = container.add(vv);
@@ -229,7 +226,6 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
 
       final MaterializedField outputField = 
MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vv = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
-      valueAllocators.add(VectorAllocator.getAllocator(vv, 200));
       aggrOutFieldIds[i] = container.add(vv);
 
       aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, 
true);
@@ -251,9 +247,8 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
               oContext.getAllocator(), incoming, this,
               aggrExprs,
               cgInner.getWorkspaceTypes(),
-              groupByOutFieldIds,
-              keyAllocators.toArray(new VectorAllocator[keyAllocators.size()]),
-              valueAllocators.toArray(new 
VectorAllocator[valueAllocators.size()]));
+              groupByOutFieldIds, 
+              this.container); 
 
     return agg;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index eaf2811..0a44f3a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import javax.inject.Named;
@@ -28,10 +29,8 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -43,7 +42,6 @@ import 
org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import 
org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -54,8 +52,6 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
 
 import com.google.common.collect.Lists;
 
@@ -80,8 +76,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private RecordBatch incoming;
   private BatchSchema schema;
   private HashAggBatch outgoing;
-  private VectorAllocator[] keyAllocators;
-  private VectorAllocator[] valueAllocators;
+  private VectorContainer outContainer;
   private FragmentContext context;
   private BufferAllocator allocator;
 
@@ -89,6 +84,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private HashTable htable;
   private ArrayList<BatchHolder> batchHolders;
   private IntHolder htIdxHolder; // holder for the Hashtable's internal index 
returned by put()
+  private IntHolder outStartIdxHolder;
+  private IntHolder outNumRecordsHolder;
+  private int numGroupByOutFields = 0; // Note: this should be <= number of 
group-by fields
 
   List<VectorAllocator> wsAllocators = Lists.newArrayList();  // allocators 
for the workspace vectors
   ErrorCollector collector = new ErrorCollectorImpl();
@@ -132,7 +130,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         MaterializedField outputField = materializedValueFields[i];
         // Create a type-specific ValueVector for this value
         vector = TypeHelper.getNewVector(outputField, allocator) ;
-        VectorAllocator.getAllocator(vector, 50 /* avg. width 
*/).alloc(HashTable.BATCH_SIZE) ;
+        vector.allocateNew();
 
         aggrValuesContainer.add(vector) ;
       }
@@ -149,16 +147,27 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       setupInterior(incoming, outgoing, aggrValuesContainer);
     }
 
-    private boolean outputValues() {
-      for (int i = 0; i <= maxOccupiedIdx; i++) {
+    private boolean outputValues(IntHolder outStartIdxHolder, IntHolder 
outNumRecordsHolder) {
+      outStartIdxHolder.value = batchOutputCount;
+      outNumRecordsHolder.value = 0;
+      boolean status = true;
+      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
         if (outputRecordValues(i, batchOutputCount) ) {
           if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: 
{}", batchOutputCount) ;
           batchOutputCount++;
+          outNumRecordsHolder.value++;
         } else {
-          return false;
+          status = false;
+          break;
         }
       }
-      return true;
+      // It's not a failure if only some records were output (at least 1) .. 
since out-of-memory
+      // conditions may prevent all records from being output; the caller has 
the responsibility to
+      // allocate more memory and continue outputting more records
+      if (!status && outNumRecordsHolder.value > 0) {
+        status = true;
+      }
+      return status;
     }
 
     private void clear() {
@@ -169,10 +178,10 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       return maxOccupiedIdx + 1;
     }
 
-    private int getOutputCount() {
-      return batchOutputCount;
+    private int getNumPendingOutput() {
+      return getNumGroups() - batchOutputCount;
     }
-
+    
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -193,8 +202,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
                     BufferAllocator allocator, RecordBatch incoming, 
HashAggBatch outgoing,
                     LogicalExpression[] valueExprs,
                     List<TypedFieldId> valueFieldIds,
-                    TypedFieldId[] groupByOutFieldIds,
-                    VectorAllocator[] keyAllocators, VectorAllocator[] 
valueAllocators)
+                    TypedFieldId[] groupByOutFieldIds, 
+                    VectorContainer outContainer) 
     throws SchemaChangeException, ClassTransformationException, IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
@@ -209,9 +218,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     this.allocator = allocator;
     this.incoming = incoming;
     this.schema = incoming.getSchema();
-    this.keyAllocators = keyAllocators;
-    this.valueAllocators = valueAllocators;
     this.outgoing = outgoing;
+    this.outContainer = outContainer;
 
     this.hashAggrConfig = hashAggrConfig;
 
@@ -226,6 +234,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
 
     this.htIdxHolder = new IntHolder();
+    this.outStartIdxHolder = new IntHolder();
+    this.outNumRecordsHolder = new IntHolder();
+    
     materializedValueFields = new MaterializedField[valueFieldIds.size()];
 
     if (valueFieldIds.size() > 0) {
@@ -239,6 +250,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     ChainedHashTable ht = new ChainedHashTable(htConfig, context, allocator, 
incoming, null /* no incoming probe */, outgoing) ;
     this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
 
+    numGroupByOutFields = groupByOutFieldIds.length;
     batchHolders = new ArrayList<BatchHolder>();
     addBatchHolder();
 
@@ -302,7 +314,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
               // outcome = out;
 
               buildComplete = true;
-
+              
               updateStats(htable);
 
               // output the first batch; remaining batches will be output
@@ -333,18 +345,17 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
   }
 
-  private void allocateOutgoing(int numOutputRecords) {
-
-    for (VectorAllocator a : keyAllocators) {
-      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numOutputRecords);
-      a.alloc(numOutputRecords);
+  private void allocateOutgoing() {
+    // Skip the keys and only allocate for outputting the workspace values
+    // (keys will be output through splitAndTransfer)
+    Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+    for (int i=0; i < numGroupByOutFields; i++) {
+      outgoingIter.next();
     }
-
-    for (VectorAllocator a : valueAllocators) {
-      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numOutputRecords);
-      a.alloc(numOutputRecords);
+    while (outgoingIter.hasNext()) {
+      ValueVector vv = outgoingIter.next().getValueVector();
+      vv.allocateNew();
     }
-
   }
 
   @Override
@@ -366,6 +377,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
     htIdxHolder = null;
     materializedValueFields = null;
+    outStartIdxHolder = null;
+    outNumRecordsHolder = null;
 
     if (batchHolders != null) {
       for (BatchHolder bh : batchHolders) {
@@ -376,12 +389,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
   }
 
-  private AggOutcome tooBigFailure(){
-    context.fail(new Exception(TOO_BIG_ERROR));
-    this.outcome = IterOutcome.STOP;
-    return AggOutcome.CLEANUP_AND_RETURN;
-  }
-
   private final AggOutcome setOkAndReturn(){
     if(first){
       this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -416,54 +423,44 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     bh.setup();
   }
-
-  // output the keys and values for a particular batch holder
-  private boolean outputKeysAndValues(int batchIdx) {
-
-    allocateOutgoing(batchIdx);
-
-    if (! this.htable.outputKeys(batchIdx)) {
-      return false;
-    }
-    if (! batchHolders.get(batchIdx).outputValues()) {
-      return false;
-    }
-
-    outBatchIndex = batchIdx+1;
-
-    if (outBatchIndex == batchHolders.size()) {
-      allFlushed = true;
-    }
-
-    return true;
-  }
-
+  
   public IterOutcome outputCurrentBatch() {
     if (outBatchIndex >= batchHolders.size()) {
       this.outcome = IterOutcome.NONE;
       return outcome;
     }
 
-    // get the number of groups in the batch holder corresponding to this 
batch index
-    int batchOutputRecords = batchHolders.get(outBatchIndex).getNumGroups();
+    // get the number of records in the batch holder that are pending output
+    int numPendingOutput = 
batchHolders.get(outBatchIndex).getNumPendingOutput();
     
-    if (!first && batchOutputRecords == 0) {
+    if (!first && numPendingOutput == 0) {
       this.outcome = IterOutcome.NONE;
       return outcome;
     }
 
-    allocateOutgoing(batchOutputRecords);
+    allocateOutgoing();
 
-    boolean outputKeysStatus = this.htable.outputKeys(outBatchIndex) ;
-    boolean outputValuesStatus = 
batchHolders.get(outBatchIndex).outputValues();
+    boolean outputKeysStatus = true;
+    boolean outputValuesStatus = true;
+    
+    outputValuesStatus = 
batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, 
outNumRecordsHolder);
+    int numOutputRecords = outNumRecordsHolder.value;
+    
+    if (EXTRA_DEBUG_1) {
+      logger.debug("After output values: outStartIdx = {}, outNumRecords = 
{}", outStartIdxHolder.value, outNumRecordsHolder.value);
+    }
+    if (outputValuesStatus) {
+      outputKeysStatus = this.htable.outputKeys(outBatchIndex, 
this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value) ;
+    }
+    
     if (outputKeysStatus && outputValuesStatus) {
 
       // set the value count for outgoing batch value vectors
       for(VectorWrapper<?> v : outgoing) {
-        v.getValueVector().getMutator().setValueCount(batchOutputRecords);
+        v.getValueVector().getMutator().setValueCount(numOutputRecords);
       }
 
-      outputCount += batchOutputRecords;
+      outputCount += numOutputRecords;
 
       if(first){
         this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -471,9 +468,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         this.outcome = IterOutcome.OK;
       }
 
-      logger.debug("HashAggregate: Output current batch index {} with {} 
records.", outBatchIndex, batchOutputRecords);
+      logger.debug("HashAggregate: Output current batch index {} with {} 
records.", outBatchIndex, numOutputRecords);
 
-      lastBatchOutputCount = batchOutputRecords;
+      lastBatchOutputCount = numOutputRecords;
       outBatchIndex++;
       if (outBatchIndex == batchHolders.size()) {
         allFlushed = true;
@@ -484,8 +481,20 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         this.cleanup();
       }
     } else {
-      if (!outputKeysStatus) context.fail(new Exception("Failed to output keys 
for current batch !"));
-      if (!outputValuesStatus) context.fail(new Exception("Failed to output 
values for current batch !"));
+      if (!outputKeysStatus) {
+        logger.debug("Failed to output keys for current batch index: {} ", 
outBatchIndex); 
+        for(VectorWrapper<?> v : outContainer) {
+          logger.debug("At the time of failure, size of valuevector in 
outContainer = {}.", v.getValueVector().getValueCapacity());
+        }        
+        context.fail(new Exception("Failed to output keys for current batch 
!"));
+      }
+      if (!outputValuesStatus) {
+        logger.debug("Failed to output values for current batch index: {} ", 
outBatchIndex);
+        for(VectorWrapper<?> v : outContainer) {
+          logger.debug("At the time of failure, size of valuevector in 
outContainer = {}.", v.getValueVector().getValueCapacity());
+        }
+        context.fail(new Exception("Failed to output values for current batch 
!"));
+      }
       this.outcome = IterOutcome.STOP;
     }
 
@@ -524,7 +533,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
     */
 
-    HashTable.PutStatus putStatus = htable.put(incomingRowIdx, htIdxHolder) ;
+    HashTable.PutStatus putStatus = htable.put(incomingRowIdx, htIdxHolder, 1 
/* retry count */) ;
 
     if (putStatus != HashTable.PutStatus.PUT_FAILED) {
       int currentIdx = htIdxHolder.value;
@@ -561,6 +570,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     }
 
+    logger.debug("HashAggr Put failed ! incomingRowIdx = {}, hash table size = 
{}.", incomingRowIdx, htable.size());
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index b94f299..421bd53 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -33,6 +33,7 @@ import 
org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public interface HashAggregator {
@@ -48,7 +49,7 @@ public interface HashAggregator {
                              HashAggBatch outgoing, LogicalExpression[] 
valueExprs, 
                              List<TypedFieldId> valueFieldIds,
                              TypedFieldId[] keyFieldIds,
-                             VectorAllocator[] keyAllocators, 
VectorAllocator[] valueAllocators)
+                             VectorContainer outContainer) 
     throws SchemaChangeException, IOException, ClassTransformationException;
 
   public abstract IterOutcome getOutcome();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index e1179d0..7522488 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -141,7 +141,7 @@ public class ChainedHashTable {
       final MaterializedField outputField = 
MaterializedField.create(ne.getRef(), expr.getMajorType());
       // create a type-specific ValueVector for this key
       ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
-      VectorAllocator.getAllocator(vv, 50 /* avg width 
*/).alloc(HashTable.BATCH_SIZE);
+      vv.allocateNew();
       htKeyFieldIds[i] = htContainerOrig.add(vv);
       
       i++;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 9f5d4f8..375836a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.common;
 
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -48,7 +49,7 @@ public interface HashTable {
                     RecordBatch incomingBuild, RecordBatch incomingProbe, 
                     RecordBatch outgoing, VectorContainer htContainerOrig);
 
-  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder);
+  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder, int 
retryCount);
   
   public int containsKey(int incomingRowIdx, boolean isProbe);
 
@@ -60,7 +61,7 @@ public interface HashTable {
 
   public void clear();
 
-  public boolean outputKeys(int batchIdx);
+  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int 
outStartIndex, int numRecords);
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 0849e6f..45b9852 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.common;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import javax.inject.Named;
 
@@ -27,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -36,7 +38,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public abstract class HashTableTemplate implements HashTable {
 
@@ -124,7 +125,7 @@ public abstract class HashTableTemplate implements 
HashTable {
         htContainer = new VectorContainer();
         for (VectorWrapper<?> w : htContainerOrig) {
           ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
-          VectorAllocator.getAllocator(vv, 50 /* avg width 
*/).alloc(HashTable.BATCH_SIZE);
+          vv.allocateNew();
           htContainer.add(vv);
         }
       }
@@ -275,14 +276,28 @@ public abstract class HashTableTemplate implements 
HashTable {
       hashValues = newHashValues;
     }
 
-    private boolean outputKeys() {
+    private boolean outputKeys(VectorContainer outContainer, int 
outStartIndex, int numRecords) {
 
       /** for debugging
         BigIntVector vv0 = getValueVector(0);
         BigIntHolder holder = new BigIntHolder();
       */
-
-      for (int i = 0; i <= maxOccupiedIdx; i++) {
+      
+      // set the value count for htContainer's value vectors before the 
transfer ..
+      setValueCount();
+      
+      Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+      
+      for (VectorWrapper<?> sourceWrapper : htContainer) {
+        ValueVector sourceVV = sourceWrapper.getValueVector();
+        ValueVector targetVV = outgoingIter.next().getValueVector();
+        TransferPair tp = sourceVV.makeTransferPair(targetVV);
+        tp.splitAndTransfer(outStartIndex, numRecords);
+      }
+      
+/*
+      logger.debug("Attempting to output keys for batch index: {} from index 
{} to maxOccupiedIndex {}.", this.batchIndex, 0, maxOccupiedIdx);
+      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
         if (outputRecordKeys(i, batchOutputCount) ) {
           if (EXTRA_DEBUG) logger.debug("Outputting keys to output index: {}", 
batchOutputCount) ;
 
@@ -297,9 +312,17 @@ public abstract class HashTableTemplate implements 
HashTable {
           return false;
         }
       }
+ */     
       return true;
     }
 
+    private void setValueCount() {
+      for (VectorWrapper<?> vw : htContainer) {
+        ValueVector vv = vw.getValueVector();
+        vv.getMutator().setValueCount(maxOccupiedIdx + 1); 
+      }
+    }
+    
     private void dump(int idx) {
       while (true) {
         int idxWithinBatch = idx & BATCH_MASK;
@@ -443,8 +466,23 @@ public abstract class HashTableTemplate implements 
HashTable {
 
         return rounded;
   }
+  
+  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder, int 
retryCount) {
+    HashTable.PutStatus putStatus = put(incomingRowIdx, htIdxHolder) ;
+    int count = retryCount;
+    int numBatchHolders;
+    while (putStatus == PutStatus.PUT_FAILED && count > 0) {
+      logger.debug("Put into hash table failed .. Retrying with new batch 
holder...");
+      numBatchHolders = batchHolders.size();
+      this.addBatchHolder();
+      freeIndex = numBatchHolders * BATCH_SIZE;
+      putStatus = put(incomingRowIdx, htIdxHolder);
+      count--;
+    }
+    return putStatus;
+  }
 
-  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) {
+  private PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) {
 
     int hash = getHashBuild(incomingRowIdx);
     hash = Math.abs(hash);
@@ -467,7 +505,8 @@ public abstract class HashTableTemplate implements 
HashTable {
 
       if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, 
lastEntryIdxWithinBatch)) {
         // update the start index array
-        startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), 
currentIdx);
+        boolean status = 
startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), 
currentIdx);
+        assert status : "Unable to set start indices in the hash table.";
         htIdxHolder.value = currentIdx;
         return PutStatus.KEY_ADDED;
       }
@@ -649,9 +688,9 @@ public abstract class HashTableTemplate implements 
HashTable {
     numResizing++;
   }
 
-  public boolean outputKeys(int batchIdx) {
+  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int 
outStartIndex, int numRecords) {
     assert batchIdx < batchHolders.size();
-    if (! batchHolders.get(batchIdx).outputKeys()) {
+    if (! batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, 
numRecords)) {
       return false;
     }
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index e24f250..5fc3125 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -333,7 +333,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                     // For every record in the build batch , hash the key 
columns
                     for (int i = 0; i < currentRecordCount; i++) {
 
-                        HashTable.PutStatus status = hashTable.put(i, htIndex);
+                        HashTable.PutStatus status = hashTable.put(i, htIndex, 
1 /* retry count */);
 
                         if (status != HashTable.PutStatus.PUT_FAILED) {
                             /* Use the global index returned by the hash 
table, to store

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 14db66c..2fcdef3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -201,7 +201,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
      * Finally, Make sure that the no rels are repeats.
      * This could happen in the case of querying the same table twice as Optiq 
may canonicalize these.
      */
-    phyRelNode = 
ExcessiveExchangeIdentifier.removeExcessiveEchanges(phyRelNode, 
targetSliceSize);
+    phyRelNode = RelUniqifier.uniqifyGraph(phyRelNode);
 
     return phyRelNode;
   }

Reply via email to