more logging in streaming agg

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/31283c32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/31283c32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/31283c32

Branch: refs/heads/master
Commit: 31283c32299fd9607f870ceb95518e0c33515132
Parents: 4935b19
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Sat Jun 7 17:56:21 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Sat Jun 7 17:56:21 2014 -0700

----------------------------------------------------------------------
 .../impl/aggregate/StreamingAggTemplate.java    | 86 +++++++++++---------
 1 file changed, 47 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31283c32/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index e3eb6fe..48e3100 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -61,14 +61,14 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     this.currentIndex = this.getVectorIndex(underlyingIndex);
   }
 
-  
+
   private void allocateOutgoing() {
     for (VectorAllocator a : allocators) {
       if(EXTRA_DEBUG) logger.debug("Allocating {} with {} records.", a, 20000);
       a.alloc(20000);
     }
   }
-  
+
   @Override
   public IterOutcome getOutcome() {
     return outcome;
@@ -84,16 +84,16 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     this.outcome = IterOutcome.STOP;
     return AggOutcome.CLEANUP_AND_RETURN;
   }
-  
+
   @Override
   public AggOutcome doWork() {
     try{ // outside loop to ensure that first is set to false after the first 
run.
-      
+
       // if we're in the first state, allocate outgoing.
       if(first){
         allocateOutgoing();
       }
-      
+
       // pick up a remainder batch if we have one.
       if(remainderBatch != null){
         if (!outputToBatch( previousIndex )) return tooBigFailure();
@@ -101,24 +101,25 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
         remainderBatch = null;
         return setOkAndReturn();
       }
-      
-      
+
+
       // setup for new output and pick any remainder.
       if (pendingOutput) {
         allocateOutgoing();
         pendingOutput = false;
+        if(EXTRA_DEBUG) logger.debug("Attempting to output remainder.");
         if (!outputToBatch( previousIndex)) return tooBigFailure();
       }
-  
+
       if(newSchema){
         return AggOutcome.UPDATE_AGGREGATOR;
       }
-      
+
       if(lastOutcome != null){
         outcome = lastOutcome;
         return AggOutcome.CLEANUP_AND_RETURN;
       }
-      
+
       outside: while(true){
       // loop through existing records, adding as necessary.
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
@@ -138,21 +139,21 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
             } else {
               if(EXTRA_DEBUG) logger.debug("Output failed.");
               if(outputCount == 0) return tooBigFailure();
-              
+
               // mark the pending output but move forward for the next cycle.
               pendingOutput = true;
               previousIndex = currentIndex;
               incIndex();
               return setOkAndReturn();
-              
+
             }
           }
           previousIndex = currentIndex;
         }
-        
-        
+
+
         InternalBatch previous = null;
-        
+
         try{
           while(true){
             previous = new InternalBatch(incoming);
@@ -169,13 +170,13 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                 outcome = out;
                 return AggOutcome.CLEANUP_AND_RETURN;
               }
-              
 
-              
+
+
             case NOT_YET:
               this.outcome = out;
               return AggOutcome.RETURN_OUTCOME;
-              
+
             case OK_NEW_SCHEMA:
               if(EXTRA_DEBUG) logger.debug("Received new schema.  Batch has {} 
records.", incoming.getRecordCount());
               if(addedRecordCount > 0){
@@ -185,7 +186,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                 return setOkAndReturn();
               }
               cleanup();
-              return AggOutcome.UPDATE_AGGREGATOR;   
+              return AggOutcome.UPDATE_AGGREGATOR;
             case OK:
               resetIndex();
               if(incoming.getRecordCount() == 0){
@@ -204,7 +205,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                   if(addedRecordCount > 0){
                     if( !outputToBatchPrev( previous, previousIndex, 
outputCount) ){
                       remainderBatch = previous;
-                      return setOkAndReturn(); 
+                      return setOkAndReturn();
                     }
                     continue outside;
                   }
@@ -229,10 +230,10 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     }finally{
       if(first) first = !first;
     }
-    
+
   }
-  
-  
+
+
   private final void incIndex(){
     underlyingIndex++;
     if(underlyingIndex >= incoming.getRecordCount()){
@@ -241,12 +242,12 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     }
     currentIndex = getVectorIndex(underlyingIndex);
   }
-  
+
   private final void resetIndex(){
     underlyingIndex = -1;
     incIndex();
   }
-  
+
   private final AggOutcome setOkAndReturn(){
     if(first){
       this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -260,16 +261,22 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   }
 
   private final boolean outputToBatch(int inIndex){
-    boolean success = outputRecordKeys(inIndex, outputCount) //
-        && outputRecordValues(outputCount) //
-        && resetValues();
-    if(success){
-      if(EXTRA_DEBUG) logger.debug("Outputting values to {}", outputCount);
-      outputCount++;
-      addedRecordCount = 0;
+
+    if(!outputRecordKeys(inIndex, outputCount)){
+      if(EXTRA_DEBUG) logger.debug("Failure while outputting keys {}", 
outputCount);
+      return false;
     }
-    
-    return success;
+
+    if(!outputRecordValues(outputCount)){
+      if(EXTRA_DEBUG) logger.debug("Failure while outputting values {}", 
outputCount);
+      return false;
+    }
+
+    if(EXTRA_DEBUG) logger.debug("{} values output successfully", outputCount);
+    resetValues();
+    outputCount++;
+    addedRecordCount = 0;
+    return true;
   }
 
   private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int 
outIndex){
@@ -277,21 +284,22 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
         && outputRecordValues(outIndex) //
         && resetValues();
     if(success){
+      resetValues();
       outputCount++;
       addedRecordCount = 0;
     }
-    
+
     return success;
   }
-  
+
   private void addRecordInc(int index){
     addRecord(index);
     this.addedRecordCount++;
   }
-  
+
   @Override
   public void cleanup(){
-    if(remainderBatch != null) remainderBatch.clear(); 
+    if(remainderBatch != null) remainderBatch.clear();
   }
 
 
@@ -304,5 +312,5 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
   public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
   public abstract boolean resetValues();
-  
+
 }

Reply via email to