This is an automated email from the ASF dual-hosted git repository. timothyfarkas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 11f3c332bb4b51ad43053cb3b1fad5891bda2132 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> AuthorDate: Tue Jul 31 15:53:57 2018 -0700 DRILL-6654: Data verification failure with lateral unnest query having filter in and order by closes #1418 --- .../impl/aggregate/StreamingAggTemplate.java | 2 + .../impl/agg/TestStreamingAggEmitOutcome.java | 137 +++++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 9165850..f30616b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -136,12 +136,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { outcome = out; return AggOutcome.RETURN_OUTCOME; case EMIT: + outerOutcome = EMIT; if (incoming.getRecordCount() == 0) { // When we see an EMIT we let the agg record batch know that it should either // send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return // RETURN_AND_RESET with the outcome so the record batch can take care of it. return setOkAndReturnEmit(); } else { + currentIndex = this.getVectorIndex(underlyingIndex); break outer; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java index 2183efa..cead984 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome; import org.apache.drill.exec.physical.impl.MockRecordBatch; import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.test.rowSet.DirectRowSet; @@ -1164,4 +1165,140 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome { assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); } + /** + Repeats t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2 with no group by + */ + @Test + public void t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2() { + TupleMetadata inputSchema_sv2 = new SchemaBuilder() + .add("id_left", TypeProtos.MinorType.INT) + .add("cost_left", TypeProtos.MinorType.INT) + .add("name_left", TypeProtos.MinorType.VARCHAR) + .withSVMode(BatchSchema.SelectionVectorMode.TWO_BYTE) + .buildSchema(); + + final RowSet.SingleRowSet emptyRowSet_Sv2 = operatorFixture.rowSetBuilder(inputSchema_sv2) + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema_sv2) + .addSelection(false, 2, 20, "item2") + .addSelection(true, 3, 30, "item3") + .withSv2() + .build(); + + inputContainer.add(emptyRowSet_Sv2.container()); + inputContainer.add(emptyRowSet_Sv2.container()); + inputContainer.add(emptyRowSet_Sv2.container()); + inputContainer.add(emptyRowSet_Sv2.container()); + inputContainer.add(emptyRowSet_Sv2.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + inputContainerSv2.add(emptyRowSet_Sv2.getSv2()); + inputContainerSv2.add(emptyRowSet_Sv2.getSv2()); + inputContainerSv2.add(emptyRowSet_Sv2.getSv2()); + inputContainerSv2.add(emptyRowSet_Sv2.getSv2()); + inputContainerSv2.add(emptyRowSet_Sv2.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet2.getSv2()); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .addRow((long)33) + .build(); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // For special batch. + assertEquals(1, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + nonEmptyInputRowSet2.clear(); + emptyRowSet_Sv2.clear(); + expectedRowSet.clear(); + } + + /** + Repeats t22_testStreamingAggrRunsOfEmpty_NonEmpty with no group by + */ + @Test + public void t22_testStreamingAggrRunsOfEmpty_NonEmpty() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy) + .build(); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, inputContainer.get(0).getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + new ArrayList<NamedExpression>(), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + + nonEmptyInputRowSet2.clear(); + expectedRowSet.clear(); + } }