Fix an issue with keys output for multiple output batches of hash aggregate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e7115e91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e7115e91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e7115e91

Branch: refs/heads/master
Commit: e7115e91875b5453ca24e7e5995c5eba5f452e29
Parents: 5baa4ad
Author: Aman Sinha <asi...@maprtech.com>
Authored: Sat Jun 7 19:41:54 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Sat Jun 7 19:43:10 2014 -0700

----------------------------------------------------------------------
 .../impl/aggregate/HashAggTemplate.java         | 52 +++++---------------
 .../physical/impl/common/HashTableTemplate.java | 21 +++-----
 2 files changed, 19 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e7115e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index b65acb0..2fb3f02 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -74,6 +74,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private int outputCount = 0;
   private int numGroupedRecords = 0;
   private int outBatchIndex = 0;
+  private int lastBatchOutputCount = 0;
   private RecordBatch incoming;
   private BatchSchema schema;
   private RecordBatch outgoing;
@@ -97,8 +98,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   public class BatchHolder {
 
     private VectorContainer aggrValuesContainer; // container for aggr values 
(workspace variables)
-    int maxOccupiedIdx = -1;
-    int batchOutputCount = 0;
+    private int maxOccupiedIdx = -1;
+    private int batchOutputCount = 0;
 
     private BatchHolder() {
 
@@ -130,7 +131,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     private boolean outputValues() {
       for (int i = 0; i <= maxOccupiedIdx; i++) {
         if (outputRecordValues(i, batchOutputCount) ) {
-          if (EXTRA_DEBUG_2) logger.debug("Outputting values to batch index: 
{} output index: {}", batchOutputCount) ;
+          if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: 
{}", batchOutputCount) ;
           batchOutputCount++;
         } else {
           return false;
@@ -275,18 +276,10 @@ public abstract class HashAggTemplate implements 
HashAggregator {
               
               buildComplete = true;
               
-              // outputKeysAndValues() ;
-
               // output the first batch; remaining batches will be output 
               // in response to each next() call by a downstream operator
               
-              // outputKeysAndValues(outBatchIndex);
               outputCurrentBatch();
-
-              //if (isLastBatchOutput()) {
-                // cleanup my internal state since there is nothing more to 
return
-              //  this.cleanup();
-              // }
               
               // cleanup incoming batch since output of aggregation does not 
need
               // any references to the incoming
@@ -332,8 +325,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
   @Override
   public int getOutputCount() {
-    return outputCount;
-    // return batchHolders.get(outBatchIndex).getOutputCount();
+    // return outputCount;
+    return lastBatchOutputCount;
   }
 
   @Override
@@ -394,29 +387,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     bh.setup();
   }
-
-  /*
-  private boolean outputKeysAndValues() {
-
-    allocateOutgoing();
-
-    int batchIdx = 0;
-    for (BatchHolder bh : batchHolders) {
-      if (! this.htable.outputKeys(batchIdx++)) {
-        return false;
-      }
-    }
-
-    for (BatchHolder bh : batchHolders) {
-      if (! bh.outputValues() ) {
-        return false;
-      }
-    }
-    
-    allFlushed = true ;
-    return true;
-  }
-*/
   
   // output the keys and values for a particular batch holder
   private boolean outputKeysAndValues(int batchIdx) {
@@ -455,8 +425,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     
     allocateOutgoing(batchOutputRecords);
     
-    if (this.htable.outputKeys(outBatchIndex) 
-        && batchHolders.get(outBatchIndex).outputValues()) {
+    boolean outputKeysStatus = this.htable.outputKeys(outBatchIndex) ;
+    boolean outputValuesStatus = 
batchHolders.get(outBatchIndex).outputValues(); 
+    if (outputKeysStatus && outputValuesStatus) {
       
       // set the value count for outgoing batch value vectors
       for(VectorWrapper<?> v : outgoing) {
@@ -472,7 +443,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       }
       
       logger.debug("HashAggregate: Output current batch index {} with {} 
records.", outBatchIndex, batchOutputRecords);
-      
+
+      lastBatchOutputCount = batchOutputRecords;
       outBatchIndex++;
       if (outBatchIndex == batchHolders.size()) {
         allFlushed = true;
@@ -483,6 +455,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         this.cleanup();
       }
     } else {
+      if (!outputKeysStatus) context.fail(new Exception("Failed to output keys 
for current batch !"));
+      if (!outputValuesStatus) context.fail(new Exception("Failed to output 
values for current batch !"));
       this.outcome = IterOutcome.STOP;
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e7115e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index f2844ac..a8af5ea 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -21,13 +21,8 @@ import java.util.ArrayList;
 
 import javax.inject.Named;
 
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -41,9 +36,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
-import org.apache.drill.exec.vector.allocator.FixedVectorAllocator;
 
 public abstract class HashTableTemplate implements HashTable {
 
@@ -96,8 +88,6 @@ public abstract class HashTableTemplate implements HashTable {
 
   private MaterializedField dummyIntField;
 
-  private int outputCount = 0;
-
   // This class encapsulates the links, keys and values for up to BATCH_SIZE
   // *unique* records. Thus, suppose there are N incoming record batches, each
   // of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -113,7 +103,8 @@ public abstract class HashTableTemplate implements 
HashTable {
     // Array of hash values - this is useful when resizing the hash table
     private IntVector hashValues;
 
-    int maxOccupiedIdx = -1;
+    private int maxOccupiedIdx = -1;
+    private int batchOutputCount = 0;
 
     private BatchHolder(int idx) {
 
@@ -269,16 +260,16 @@ public abstract class HashTableTemplate implements 
HashTable {
       */
 
       for (int i = 0; i <= maxOccupiedIdx; i++) {
-        if (outputRecordKeys(i, outputCount) ) {
-          if (EXTRA_DEBUG) logger.debug("Outputting keys to {}", outputCount) ;
+        if (outputRecordKeys(i, batchOutputCount) ) {
+          if (EXTRA_DEBUG) logger.debug("Outputting keys to output index: {}", 
batchOutputCount) ;
 
           // debugging
           // holder.value = vv0.getAccessor().get(i);
           // if (holder.value == 100018 || holder.value == 100021) {
-          //  logger.debug("Outputting key = {} at index - {} to outgoing 
index = {}.", holder.value, i, outputCount);
+          //  logger.debug("Outputting key = {} at index - {} to outgoing 
index = {}.", holder.value, i, batchOutputCount);
           // }
 
-          outputCount++;
+          batchOutputCount++;
         } else {
           return false;
         }

Reply via email to