[ 
https://issues.apache.org/jira/browse/DRILL-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535496#comment-16535496
 ] 

ASF GitHub Bot commented on DRILL-6516:
---------------------------------------

Ben-Zvi commented on a change in pull request #1358:  DRILL-6516: EMIT support 
in streaming agg
URL: https://github.com/apache/drill/pull/1358#discussion_r199965277
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 ##########
 @@ -154,83 +188,174 @@ public void buildSchema() throws SchemaChangeException {
   public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so 
exit early
-    if (specialBatchSent) {
-      return IterOutcome.NONE;
+    if ( done || specialBatchSent) {
+      return NONE;
+    }
+
+    // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So 
we need to send
+    // an EMIT with an empty batch now
+    if (sendEmit) {
+      sendEmit = false;
+      firstBatchForDataSet = true;
+      recordCount = 0;
+      return EMIT;
     }
 
     // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
     if (aggregator == null || first) {
-      IterOutcome outcome;
       if (first && incoming.getRecordCount() > 0) {
         first = false;
-        outcome = IterOutcome.OK_NEW_SCHEMA;
+        lastKnownOutcome = OK_NEW_SCHEMA;
       } else {
-        outcome = next(incoming);
+        lastKnownOutcome = next(incoming);
       }
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-        if (first && popConfig.getKeys().size() == 0) {
+      logger.debug("Next outcome of {}", lastKnownOutcome);
+      switch (lastKnownOutcome) {
+        case NONE:
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+            // if we have a straight aggregate and empty input batch, we need 
to handle it in a different way
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OUT_OF_MEMORY:
+        case NOT_YET:
+        case STOP:
+          return lastKnownOutcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()) {
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case EMIT:
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+            // if we have a straight aggregate and empty input batch, we need 
to handle it in a different way
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            firstBatchForDataSet = true; // reset on the next iteration
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OK:
+          break;
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", 
lastKnownOutcome));
+      }
+    } else {
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()) {
+        lastKnownOutcome = incoming.next();
+        if (!first && firstBatchForDataSet) {
+          //Setup needs to be called again. During setup, generated code saves 
a reference to the vectors
+          // pointed to by the incoming batch so that the dereferencing of the 
vector wrappers to get to
+          // the vectors  does not have to be done at each call to eval. 
However, after an EMIT is seen,
+          // the vectors are replaced and the reference to the old vectors is 
no longer valid
+          try {
+            aggregator.setup(oContext, incoming, this);
+          } catch (SchemaChangeException e) {
+            UserException.Builder exceptionBuilder = 
UserException.functionError(e)
+                .message("A Schema change exception occured in calling setup() 
in generated code.");
+            throw exceptionBuilder.build(logger);
+          }
+        }
+      }
+      // We sent an EMIT in the previous iteration, so we must be starting a 
new data set
+      if (firstBatchForDataSet) {
+        done = false;
+        sendEmit = false;
+        specialBatchSent = false;
+        firstBatchForDataSet = false;
+      }
+    }
+    AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
+    recordCount = aggregator.getOutputCount();
+    container.setRecordCount(recordCount);
+    logger.debug("Aggregator response {}, records {}", aggOutcome, 
aggregator.getOutputCount());
+    // overwrite the outcome variable since we no longer need to remember the 
first batch outcome
+    lastKnownOutcome = aggregator.getOutcome();
+    switch (aggOutcome) {
+      case CLEANUP_AND_RETURN:
+        if (!first) {
+          container.zeroVectors();
+        }
+        done = true;
+        ExternalSortBatch.releaseBatches(incoming);
+        return lastKnownOutcome;
+      case RETURN_AND_RESET:
+        //WE could have got a string of batches, all empty, until we hit an 
emit
+        if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
recordCount == 0) {
           // if we have a straight aggregate and empty input batch, we need to 
handle it in a different way
           constructSpecialBatch();
-          first = false;
           // set state to indicate the fact that we have sent a special batch 
and input is empty
           specialBatchSent = true;
-          return IterOutcome.OK;
+          firstBatchForDataSet = true; // reset on the next iteration
 
 Review comment:
   Isn't `firstBatchForDataSet` already guaranteed *true* here ?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support for EMIT outcome in streaming agg
> -----------------------------------------
>
>                 Key: DRILL-6516
>                 URL: https://issues.apache.org/jira/browse/DRILL-6516
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Update the streaming aggregator to recognize the EMIT outcome



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to