This is an automated email from the ASF dual-hosted git repository. timothyfarkas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit ee841643d0f2d746126f623d6a0c480b3011d38f Author: Parth Chandra <[email protected]> AuthorDate: Fri Jul 20 17:24:38 2018 -0700 DRILL-6631: Streaming agg causes queries with Lateral and Unnest to return incorrect results. This commit fixes issues with handling straight aggregates (no group by) with empty batches received between EMIT(s). closes #1399 --- .../physical/impl/aggregate/StreamingAggBatch.java | 39 +- .../impl/aggregate/StreamingAggTemplate.java | 2 +- .../impl/agg/TestStreamingAggEmitOutcome.java | 553 +++++++++++++++++++++ 3 files changed, 573 insertions(+), 21 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 882c36d..70880c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -188,16 +188,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { public IterOutcome innerNext() { // if a special batch has been sent, we have no data in the incoming so exit early - if ( done || specialBatchSent) { + if (done || specialBatchSent) { + assert (sendEmit != true); // if special batch sent with emit then flag will not be set return NONE; } // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So we need to send // an EMIT with an empty batch now if (sendEmit) { + first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch sendEmit = false; firstBatchForDataSet = true; recordCount = 0; + specialBatchSent = false; return EMIT; } @@ -212,15 +215,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { logger.debug("Next outcome of {}", lastKnownOutcome); switch (lastKnownOutcome) { case NONE: - if (firstBatchForDataSet && popConfig.getKeys().size() == 0) { + + if (first && popConfig.getKeys().size() == 0) { // if we have a straight aggregate and empty input batch, we need to handle it in a different way + // Wewant to produce the special batch only if we got a NONE as the first outcome after + // OK_NEW_SCHEMA. If we get a NONE immediately after we see an EMIT, then we have already handled + // the case of the empty batch constructSpecialBatch(); // set state to indicate the fact that we have sent a special batch and input is empty specialBatchSent = true; // If outcome is NONE then we send the special batch in the first iteration and the NONE // outcome in the next iteration. If outcome is EMIT, we can send the special // batch and the EMIT outcome at the same time. - return getFinalOutcome(); + return IterOutcome.OK; } // else fall thru case OUT_OF_MEMORY: @@ -238,13 +245,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { // we have to do the special handling if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) { constructSpecialBatch(); - // set state to indicate the fact that we have sent a special batch and input is empty - specialBatchSent = true; firstBatchForDataSet = true; // reset on the next iteration // If outcome is NONE then we send the special batch in the first iteration and the NONE // outcome in the next iteration. If outcome is EMIT, we can send the special - // batch and the EMIT outcome at the same time. - return getFinalOutcome(); + // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA) + IterOutcome finalOutcome = getFinalOutcome(); + return finalOutcome; } // else fall thru case OK: @@ -269,13 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } } - // We sent an EMIT in the previous iteration, so we must be starting a new data set - if (firstBatchForDataSet) { - done = false; - sendEmit = false; - specialBatchSent = false; - firstBatchForDataSet = false; - } } AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome); recordCount = aggregator.getOutputCount(); @@ -296,14 +295,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) { // if we have a straight aggregate and empty input batch, we need to handle it in a different way constructSpecialBatch(); - // set state to indicate the fact that we have sent a special batch and input is empty - specialBatchSent = true; // If outcome is NONE then we send the special batch in the first iteration and the NONE // outcome in the next iteration. If outcome is EMIT, we can send the special // batch and the EMIT outcome at the same time. - return getFinalOutcome(); + + IterOutcome finalOutcome = getFinalOutcome(); + return finalOutcome; } firstBatchForDataSet = true; + firstBatchForSchema = false; if(first) { first = false; } @@ -332,9 +332,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } else if (lastKnownOutcome == OK && first) { lastKnownOutcome = OK_NEW_SCHEMA; - } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) { - first = false; } + first = false; return lastKnownOutcome; case UPDATE_AGGREGATOR: // We could get this either between data sets or within a data set. @@ -629,12 +628,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } if (firstBatchForSchema) { outcomeToReturn = OK_NEW_SCHEMA; + sendEmit = true; firstBatchForSchema = false; } else if (lastKnownOutcome == EMIT) { firstBatchForDataSet = true; outcomeToReturn = EMIT; } else { - // get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK outcomeToReturn = (recordCount == 0) ? NONE : OK; } return outcomeToReturn; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index a752c7e..9165850 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -40,7 +40,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { // First batch after build schema phase private boolean first = true; - private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA. + private boolean firstBatchForSchema = false; // true if the current batch came in with an OK_NEW_SCHEMA. private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set private boolean newSchema = false; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java index 75c4598..2183efa 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.agg; import org.apache.drill.categories.OperatorTest; +import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome; @@ -33,6 +34,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.ArrayList; + import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; import static org.junit.Assert.assertEquals; @@ -42,6 +45,7 @@ import static org.junit.Assert.assertTrue; public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome { //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class); protected static TupleMetadata resultSchema; + protected static TupleMetadata resultSchemaNoGroupBy; @BeforeClass public static void setUpBeforeClass2() throws Exception { @@ -49,6 +53,9 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome { .add("name", TypeProtos.MinorType.VARCHAR) .addNullable("total_sum", TypeProtos.MinorType.BIGINT) .buildSchema(); + resultSchemaNoGroupBy = new SchemaBuilder() + .addNullable("total_sum", TypeProtos.MinorType.BIGINT) + .buildSchema(); } /** @@ -611,4 +618,550 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome { assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); } + + /******************************************************* + * Tests for EMIT with empty batches and no group by + * (Tests t1-t8 are repeated with no group by) + *******************************************************/ + + + /** + * Repeats t1_testStreamingAggrEmptyBatchEmitOutcome with no group by + */ + @Test + public void t11_testStreamingAggrEmptyBatchEmitOutcome() { + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(OK_NEW_SCHEMA); + inputOutcomes.add(OK_NEW_SCHEMA); + inputOutcomes.add(EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + } + + /** + Repeats t2_testStreamingAggrNonEmptyBatchEmitOutcome with no group by + */ + @Test + public void t12_testStreamingAggrNonEmptyBatchEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(13, 130, "item13") + .addRow(13, 130, "item13") + .addRow(2, 20, "item2") + .addRow(2, 20, "item2") + .addRow(4, 40, "item4") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)385) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // Data before EMIT is returned with an OK_NEW_SCHEMA. + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + // EMIT comes with an empty batch + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet.clear(); + } + + /** + Repeats t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by + */ + @Test + public void t13_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(13, 130, "item13") + .addRow(0, 1300, "item13") + .addRow(2, 20, "item2") + .addRow(0, 2000, "item2") + .addRow(4, 40, "item4") + .addRow(0, 4000, "item4") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)7509) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet.clear(); + } + + /** + Repeats t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by + */ + @Test + public void t14_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(13, 130, "item13") + .addRow(0, 0, "item13") + .addRow(1, 33000, "item13") + .addRow(2, 20, "item2") + .addRow(0, 0, "item2") + .addRow(1, 11000, "item2") + .addRow(4, 40, "item4") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)44211) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet.clear(); + } + + /** + Repeats t5_testStreamingAgrResetsAfterFirstEmitOutcome with no group by + */ + @Test + public void t15_testStreamingAgrResetsAfterFirstEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .addRow(2, 20, "item2") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .build(); + + final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)11) + .build(); + + final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)374) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet1).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet2).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet2.clear(); + expectedRowSet1.clear(); + } + + /** + Repeats t6_testStreamingAggrOkFollowedByNone with no group by + */ + @Test + public void t16_testStreamingAggrOkFollowedByNone() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .addRow(3, 30, "item3") + .addRow(4, 40, "item4") + .addRow(4, 40, "item4") + .addRow(5, 50, "item5") + .addRow(5, 50, "item5") + .build(); + + final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)11) + .build(); + + final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)253) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet1).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK); + assertEquals(1, strAggBatch.getRecordCount()); + + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet2).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet2.clear(); + expectedRowSet1.clear(); + } + + /** + Repeats t7_testStreamingAggrMultipleEMITOutcome with no group by + */ + @Test + public void t17_testStreamingAggrMultipleEMITOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .addRow(3, 30, "item3") + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + } + + /** + Repeats t8_testStreamingAggrMultipleInputToSingleOutputBatch with no group by + */ + @Test + public void t18_testStreamingAggrMultipleInputToSingleOutputBatch() { + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)33) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + } + + + /** + Repeats t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome with no group by + */ + @Test + public void t19_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item1") + .addRow(13, 130, "item13") + .addRow(13, 130, "item13") + .addRow(13, 130, "item13") + .addRow(130, 1300, "item130") + .addRow(0, 0, "item130") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(23, 230, "item23") + .addRow(3, 33, "item3") + .addRow(7, 70, "item7") + .addRow(17, 170, "item7") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)2445) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + nonEmptyInputRowSet3.clear(); + expectedRowSet.clear(); + } + + /** + Repeats t10_testStreamingAggrWithEmptyDataSet with no group by + */ + @Test + public void t20_testStreamingAggrWithEmptyDataSet() { + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + } + }
