[ 
https://issues.apache.org/jira/browse/DRILL-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535489#comment-16535489
 ] 

ASF GitHub Bot commented on DRILL-6516:
---------------------------------------

Ben-Zvi commented on a change in pull request #1358:  DRILL-6516: EMIT support 
in streaming agg
URL: https://github.com/apache/drill/pull/1358#discussion_r199963330
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 ##########
 @@ -154,83 +188,174 @@ public void buildSchema() throws SchemaChangeException {
   public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so 
exit early
-    if (specialBatchSent) {
-      return IterOutcome.NONE;
+    if ( done || specialBatchSent) {
+      return NONE;
+    }
+
+    // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So 
we need to send
+    // an EMIT with an empty batch now
+    if (sendEmit) {
+      sendEmit = false;
+      firstBatchForDataSet = true;
+      recordCount = 0;
+      return EMIT;
     }
 
     // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
     if (aggregator == null || first) {
-      IterOutcome outcome;
       if (first && incoming.getRecordCount() > 0) {
         first = false;
-        outcome = IterOutcome.OK_NEW_SCHEMA;
+        lastKnownOutcome = OK_NEW_SCHEMA;
       } else {
-        outcome = next(incoming);
+        lastKnownOutcome = next(incoming);
       }
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-        if (first && popConfig.getKeys().size() == 0) {
+      logger.debug("Next outcome of {}", lastKnownOutcome);
+      switch (lastKnownOutcome) {
+        case NONE:
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+            // if we have a straight aggregate and empty input batch, we need 
to handle it in a different way
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OUT_OF_MEMORY:
+        case NOT_YET:
+        case STOP:
+          return lastKnownOutcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()) {
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case EMIT:
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+            // if we have a straight aggregate and empty input batch, we need 
to handle it in a different way
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            firstBatchForDataSet = true; // reset on the next iteration
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OK:
+          break;
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", 
lastKnownOutcome));
+      }
+    } else {
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()) {
+        lastKnownOutcome = incoming.next();
+        if (!first && firstBatchForDataSet) {
 
 Review comment:
   Isn't `firstBatchForDataSet` guaranteed true here (due to the enclosing if() 
) ?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support for EMIT outcome in streaming agg
> -----------------------------------------
>
>                 Key: DRILL-6516
>                 URL: https://issues.apache.org/jira/browse/DRILL-6516
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Update the streaming aggregator to recognize the EMIT outcome



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to