[ 
https://issues.apache.org/jira/browse/DRILL-6516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16538810#comment-16538810
 ] 

ASF GitHub Bot commented on DRILL-6516:
---------------------------------------

parthchandra commented on a change in pull request #1358:  DRILL-6516: EMIT 
support in streaming agg
URL: https://github.com/apache/drill/pull/1358#discussion_r201170290
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 ##########
 @@ -189,83 +209,128 @@ else if (isSame( previousIndex, currentIndex )) {
               logger.debug("Received IterOutcome of {}", out);
             }
             switch (out) {
-            case NONE:
-              done = true;
-              lastOutcome = out;
-              if (first && addedRecordCount == 0) {
-                return setOkAndReturn();
-              } else if (addedRecordCount > 0) {
-                outputToBatchPrev(previous, previousIndex, outputCount); // No 
need to check the return value
-                // (output container full or not) as we are not going to 
insert any more records.
-                if (EXTRA_DEBUG) {
-                  logger.debug("Received no more batches, returning.");
+              case NONE:
+                done = true;
+                lastOutcome = out;
+                if (firstBatchForDataSet && addedRecordCount == 0) {
+                  return setOkAndReturn(out);
+                } else if (addedRecordCount > 0) {
+                  outputToBatchPrev(previous, previousIndex, outputCount); // 
No need to check the return value
+                  // (output container full or not) as we are not going to 
insert any more records.
+                  if (EXTRA_DEBUG) {
+                    logger.debug("Received no more batches, returning.");
+                  }
+                  return setOkAndReturn(out);
+                } else {
+                  // not first batch and record Count == 0
+                  outcome = out;
+                  return AggOutcome.CLEANUP_AND_RETURN;
                 }
-                return setOkAndReturn();
-              } else {
-                if (first && out == IterOutcome.OK) {
-                  out = IterOutcome.OK_NEW_SCHEMA;
+                // EMIT is handled like OK, except that we do not loop back to 
process the
+                // next incoming batch; we return instead
+              case EMIT:
+                if (incoming.getRecordCount() == 0) {
+                  if (addedRecordCount > 0) {
+                    outputToBatchPrev(previous, previousIndex, outputCount);
+                  }
+                  resetIndex();
+                  return setOkAndReturn(out);
+                } else {
+                  resetIndex();
+                  if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
+                    if (EXTRA_DEBUG) {
+                      logger.debug("New value was same as last value of 
previous batch, adding.");
+                    }
+                    addRecordInc(currentIndex);
+                    previousIndex = currentIndex;
+                    incIndex();
+                    if (EXTRA_DEBUG) {
+                      logger.debug("Continuing outside");
+                    }
+                    processRemainingRecordsInBatch();
+                    // currentIndex has been reset to int_max so use previous 
index.
+                    outputToBatch(previousIndex);
+                    resetIndex();
+                    return setOkAndReturn(out);
+                  } else { // not the same
+                    if (EXTRA_DEBUG) {
+                      logger.debug("This is not the same as the previous, add 
record and continue outside.");
+                    }
+                    if (addedRecordCount > 0) {
+                      if (outputToBatchPrev(previous, previousIndex, 
outputCount)) {
+                        if (EXTRA_DEBUG) {
+                          logger.debug("Output container is full. flushing 
it.");
+                        }
 
 Review comment:
   Yes it does appear out of place but  it isn't really. It actually is tied to 
the fact that we must reset several state variables (this being one of them) 
every time EMIT is processed.
   Also, while debugging I found places where I had missed resetting the 
previousIndex, so this is a safe place to do so. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support for EMIT outcome in streaming agg
> -----------------------------------------
>
>                 Key: DRILL-6516
>                 URL: https://issues.apache.org/jira/browse/DRILL-6516
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Update the streaming aggregator to recognize the EMIT outcome



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to