[ 
https://issues.apache.org/jira/browse/DRILL-6654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568973#comment-16568973
 ] 

ASF GitHub Bot commented on DRILL-6654:
---------------------------------------

ilooner closed pull request #1418: DRILL-6654: Data verification failure with 
lateral unnest query havin…
URL: https://github.com/apache/drill/pull/1418
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 916585088f1..f30616bacd4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -136,12 +136,14 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
                   outcome = out;
                   return AggOutcome.RETURN_OUTCOME;
                 case EMIT:
+                  outerOutcome = EMIT;
                   if (incoming.getRecordCount() == 0) {
                     // When we see an EMIT we let the  agg record batch know 
that it should either
                     // send out an EMIT or an OK_NEW_SCHEMA, followed by an 
EMIT. To do that we simply return
                     // RETURN_AND_RESET with the outcome so the record batch 
can take care of it.
                     return setOkAndReturnEmit();
                   } else {
+                    currentIndex = this.getVectorIndex(underlyingIndex);
                     break outer;
                   }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 2183efa1db5..cead984cc5c 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -24,6 +24,7 @@
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.rowSet.DirectRowSet;
@@ -1164,4 +1165,140 @@ public void t20_testStreamingAggrWithEmptyDataSet() {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
 
+  /**
+   Repeats t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2 with no group by
+   */
+  @Test
+  public void t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2() {
+    TupleMetadata inputSchema_sv2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .withSVMode(BatchSchema.SelectionVectorMode.TWO_BYTE)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyRowSet_Sv2 = 
operatorFixture.rowSetBuilder(inputSchema_sv2)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema_sv2)
+      .addSelection(false, 2, 20, "item2")
+      .addSelection(true, 3, 30, "item3")
+      .withSv2()
+      .build();
+
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainerSv2, 
inputContainer.get(0).getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      new ArrayList<NamedExpression>(),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+      .addRow((long)33)
+      .build();
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // For special batch.
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    nonEmptyInputRowSet2.clear();
+    emptyRowSet_Sv2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t22_testStreamingAggrRunsOfEmpty_NonEmpty with no group by
+   */
+  @Test
+  public void t22_testStreamingAggrRunsOfEmpty_NonEmpty() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+      .build();
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainer.get(0).getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      new ArrayList<NamedExpression>(),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Data verification failure with lateral unnest query having filter in and 
> order by
> ---------------------------------------------------------------------------------
>
>                 Key: DRILL-6654
>                 URL: https://issues.apache.org/jira/browse/DRILL-6654
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Kedar Sankar Behera
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.15.0
>
>         Attachments: Lateral Parquet.pdf, Lateral json.pdf, flatten.pdf
>
>
> Data verification failure with lateral unnest query having filter in and 
> order by .
> lateral query - 
> {code}
> select customer.c_custkey, customer.c_name, orders.totalprice from customer, 
> lateral (select sum(t.o.o_totalprice) as totalprice from 
> unnest(customer.c_orders) t(o) WHERE t.o.o_totalprice in 
> (89230.03,270087.44,246408.53,82657.72,153941.38,65277.06,180309.76)) orders 
> order by customer.c_custkey limit 50;
> {code}
> result :-
> {code}
> +------------+---------------------+-------------+
> | c_custkey | c_name | totalprice |
> +------------+---------------------+-------------+
> | 101276 | Customer#000101276 | 82657.72 |
> | 120295 | Customer#000120295 | 266119.96 |
> | 120376 | Customer#000120376 | 180309.76 |
> +------------+---------------------+-------------+
> {code}
> flatten query -
> {code}
> select f.c_custkey, f.c_name, sum(f.o.o_totalprice) from (select c_custkey, 
> c_name, flatten(c_orders) as o from customer) f WHERE f.o.o_totalprice in 
> (89230.03,270087.44,246408.53,82657.72,153941.38,65277.06,180309.76) group by 
> f.c_custkey, f.c_name order by f.c_custkey limit 50;
> {code}
> result :-
> {code}
> +------------+---------------------+------------+
> | c_custkey | c_name | EXPR$2 |
> +------------+---------------------+------------+
> | 101276 | Customer#000101276 | 82657.72 |
> | 120376 | Customer#000120376 | 180309.76 |
> +------------+---------------------+------------+
> {code}
> PS :- The above results are for Parquet type data .The same query for JSON 
> data gives identical result given as follows :-
> {code}
> +------------+---------------------+------------+
> | c_custkey | c_name | EXPR$2 |
> +------------+---------------------+------------+
> | 101276 | Customer#000101276 | 82657.72 |
> | 120376 | Customer#000120376 | 180309.76 |
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to