[
https://issues.apache.org/jira/browse/DRILL-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488033#comment-16488033
]
ASF GitHub Bot commented on DRILL-6440:
---------------------------------------
sohami closed pull request #1283: DRILL-6440: Unnest unit tests and fixes for
stats
URL: https://github.com/apache/drill/pull/1283
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 57a0adeb7c..e985c4defe 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -207,6 +207,9 @@ public IterOutcome innerNext() {
} finally {
stats.stopSetup();
}
+ // since we never called next on an upstream operator, incoming stats are
+ // not updated. update input stats explicitly.
+ stats.batchReceived(0, incoming.getRecordCount(), true);
return IterOutcome.OK_NEW_SCHEMA;
} else {
assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
@@ -223,11 +226,13 @@ public IterOutcome innerNext() {
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
+ stats.batchReceived(0, incoming.getRecordCount(), true);
return OK_NEW_SCHEMA;
}
if (lateral.getRecordIndex() == 0) {
unnest.resetGroupIndex();
}
+ stats.batchReceived(0, incoming.getRecordCount(), false);
return doWork();
}
@@ -348,8 +353,7 @@ protected IterOutcome doWork() {
recordCount = 0;
final List<TransferPair> transfers = Lists.newArrayList();
- final FieldReference fieldReference =
- new FieldReference(popConfig.getColumn());
+ final FieldReference fieldReference = new
FieldReference(popConfig.getColumn());
final TransferPair transferPair =
getUnnestFieldTransferPair(fieldReference);
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index 137966ba33..c04bff7753 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -615,7 +615,8 @@ public void testUnnestNonArrayColumn() {
* ]
* }
*
- * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar
schema and data
+ * @see
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray
TestResultSetLoaderMapArray for
+ * similar schema and data
* @return TupleMetadata corresponding to the schema
*/
private TupleMetadata getRepeatedMapSchema() {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 9318c516b9..f281964fb4 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -48,7 +48,6 @@
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -147,7 +146,6 @@ public void testUnnestVarWidthColumn() {
}
- @Ignore("RecordBatchSizer throws Exception in RecordBatchSizer.expandMap")
@Test
public void testUnnestMapColumn() {
@@ -297,15 +295,21 @@ public void testUnnestSchemaChange() {
}
- @Ignore ("Batch limits need to be sync'd with tthe record batch sizer. Fix
once the calulations are stabilized")
@Test
public void testUnnestLimitBatchSize() {
- final int limitedOutputBatchSize = 1024;
- final int inputBatchSize = 1024+1;
- final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
-
// int + size of int * num entries in
-
// array)
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ // size of lateral output batch = 4N * (N + 5) bytes, where N = output
batch row count
+ // Lateral output batch size = N * input row size + N * size of single
unnest column
+ // = N * (size of row id + size of array
offset vector + (N + 1 )*size of single array entry))
+ // + N * 4
+ // = N * (4 + 2*4 + (N+1)*4 ) + N * 4
+ // = N * (16 + 4N) + N * 4
+ // = 4N * (N + 5)
+ // configure the output batch size to be one more record than that so that
the batch sizer can round down
+ final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize *
(limitedOutputBatchSize + 6);
+
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
@@ -360,11 +364,10 @@ public void testUnnestKillFromLimitSubquery1() {
// similar to previous test; we split a record across more than one batch.
// but we also set a limit less than the size of the batch so only one
batch gets output.
- final int limitedOutputBatchSize = 1024;
- final int inputBatchSize = 1024+1;
- final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
-
// int + size of int * num entries in
-
// array)
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize *
(limitedOutputBatchSize + 6);
+
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
@@ -419,11 +422,10 @@ public void testUnnestKillFromLimitSubquery2() {
// similar to previous test but the size of the array fits exactly into
the record batch;
- final int limitedOutputBatchSize = 1024;
- final int inputBatchSize = 1024;
- final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
-
// int + size of int * num entries in
-
// array)
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize *
(limitedOutputBatchSize + 6);
+
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
@@ -682,15 +684,16 @@ public void testUnnestNonArrayColumn() {
* ]
* }
*
- * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar
schema and data
+ * @see
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray
TestResultSetLoaderMapArray for
+ * similar schema and data
* @return TupleMetadata corresponding to the schema
*/
private TupleMetadata getRepeatedMapSchema() {
TupleMetadata schema = new SchemaBuilder()
.add("rowNum", TypeProtos.MinorType.INT)
.addMapArray("unnestColumn")
- .add("colA", TypeProtos.MinorType.INT)
- .addArray("colB", TypeProtos.MinorType.VARCHAR)
+ .add("colA", TypeProtos.MinorType.INT)
+ .addArray("colB", TypeProtos.MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
return schema;
@@ -742,6 +745,30 @@ private TupleMetadata getRepeatedMapSchema() {
return d;
}
+ private Object[][][] getNestedMapBaseline() {
+
+ Object[][][] d = {
+ {
+ {2,2,2,2,3,3,3,3,4,4,4,4},
+ {
+ "1.1.1",
+ "1.1.2",
+ "1.2.1",
+ "1.2.2",
+ "2.1.1",
+ "2.1.2",
+ "2.3.1",
+ "2.3.2",
+ "3.1.1",
+ "3.1.2",
+ "3.2.1",
+ "3.2.2"
+ }
+ }
+ };
+ return d;
+ }
+
private boolean compareMapBaseline(Object baselineValue, Object vector) {
String vv = vector.toString();
String b = (String)baselineValue;
@@ -769,5 +796,216 @@ private boolean isTerminal(RecordBatch.IterOutcome
outcome) {
== RecordBatch.IterOutcome.OUT_OF_MEMORY);
}
+
+ /**
+ * Run a plan like the following for various input batches :
+ * Lateral1
+ * / \
+ * / Lateral2
+ * Scan / \
+ * / \
+ * Project1 Project2
+ * / \
+ * / \
+ * Unnest1 Unnest2
+ *
+ *
+ * @param incomingSchemas
+ * @param iterOutcomes
+ * @param execKill
+ * @param data
+ * @param baseline
+ * @param <T>
+ * @throws Exception
+ */
+
+
+ private <T> void testNestedUnnest( TupleMetadata[] incomingSchemas,
+ RecordBatch.IterOutcome[] iterOutcomes,
+ int execKill, // number of batches after which to kill the execution (!)
+ T[][] data,
+ T[][][] baseline) throws Exception {
+
+ // Get the incoming container with dummy data for LJ
+ final List<VectorContainer> incomingContainer = new
ArrayList<>(data.length);
+
+ // Create data
+ ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
+ int rowNumber = 0;
+ int batchNum = 0;
+ for ( Object[] recordBatch : data) {
+ RowSetBuilder rowSetBuilder =
fixture.rowSetBuilder(incomingSchemas[batchNum]);
+ for ( Object rowData : recordBatch) {
+ rowSetBuilder.addRow(++rowNumber, rowData);
+ }
+ RowSet.SingleRowSet rowSet = rowSetBuilder.build();
+ rowSets.add(rowSet);
+ incomingContainer.add(rowSet.container());
+ batchNum++;
+ }
+
+ // Get the unnest POPConfig
+ final UnnestPOP unnestPopConfig1 = new UnnestPOP(null,
SchemaPath.getSimplePath("unnestColumn"));
+ final UnnestPOP unnestPopConfig2 = new UnnestPOP(null,
SchemaPath.getSimplePath("colB"));
+
+ // Get the IterOutcomes for LJ
+ final List<RecordBatch.IterOutcome> outcomes = new
ArrayList<>(iterOutcomes.length);
+ for(RecordBatch.IterOutcome o : iterOutcomes) {
+ outcomes.add(o);
+ }
+
+ // Create incoming MockRecordBatch
+ final MockRecordBatch incomingMockBatch =
+ new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
incomingContainer, outcomes,
+ incomingContainer.get(0).getSchema());
+
+ // setup Unnest record batch
+ final UnnestRecordBatch unnestBatch1 =
+ new UnnestRecordBatch(unnestPopConfig1, fixture.getFragmentContext());
+ final UnnestRecordBatch unnestBatch2 =
+ new UnnestRecordBatch(unnestPopConfig2, fixture.getFragmentContext());
+
+ // Create intermediate Project
+ final Project projectPopConfig1 =
+ new Project(DrillLogicalTestutils.parseExprs("unnestColumn.colB",
"colB"), unnestPopConfig1);
+ final ProjectRecordBatch projectBatch1 =
+ new ProjectRecordBatch(projectPopConfig1, unnestBatch1,
fixture.getFragmentContext());
+ final Project projectPopConfig2 =
+ new Project(DrillLogicalTestutils.parseExprs("colB", "unnestColumn2"),
unnestPopConfig2);
+ final ProjectRecordBatch projectBatch2 =
+ new ProjectRecordBatch(projectPopConfig2, unnestBatch2,
fixture.getFragmentContext());
+
+ final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1,
projectPopConfig2, JoinRelType.FULL);
+ final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig,
ljPopConfig2, JoinRelType.FULL);
+
+ final LateralJoinBatch lateralJoinBatch2 =
+ new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(),
projectBatch1, projectBatch2);
+ final LateralJoinBatch lateralJoinBatch1 =
+ new LateralJoinBatch(ljPopConfig1, fixture.getFragmentContext(),
incomingMockBatch, lateralJoinBatch2);
+
+ // set pointer to Lateral in unnest
+ unnestBatch1.setIncoming((LateralContract) lateralJoinBatch1);
+ unnestBatch2.setIncoming((LateralContract) lateralJoinBatch2);
+
+ // Simulate the pipeline by calling next on the incoming
+
+ // results is an array ot batches, each batch being an array of output
vectors.
+ List<List<ValueVector> > resultList = new ArrayList<>();
+ List<List<ValueVector> > results = null;
+ int batchesProcessed = 0;
+ try{
+ try {
+ while (!isTerminal(lateralJoinBatch1.next())) {
+ if (lateralJoinBatch1.getRecordCount() > 0) {
+ addBatchToResults(resultList, lateralJoinBatch1);
+ }
+ batchesProcessed++;
+ if (batchesProcessed == execKill) {
+ lateralJoinBatch1.getContext().getExecutorState().fail(new
DrillException("Testing failure of execution."));
+ lateralJoinBatch1.kill(true);
+ }
+ // else nothing to do
+ }
+ } catch (UserException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new Exception ("Test failed to execute lateralJoinBatch.next()
because: " + e.getMessage());
+ }
+
+ // Check results against baseline
+ results = resultList;
+
+ int batchIndex = 0;
+ int vectorIndex = 0;
+ //int valueIndex = 0;
+ for ( List<ValueVector> batch: results) {
+ int vectorCount= batch.size();
+ if (vectorCount!= baseline[batchIndex].length+2) { // baseline does
not include the original unnest column(s)
+ fail("Test failed in validating unnest output. Batch column count
mismatch.");
+ }
+ for (ValueVector vv : batch) {
+ if(vv.getField().getName().equals("unnestColumn") ||
vv.getField().getName().equals("colB")) {
+ continue; // skip the original input column
+ }
+ int valueCount = vv.getAccessor().getValueCount();
+ if (valueCount!= baseline[batchIndex][vectorIndex].length) {
+ fail("Test failed in validating unnest output. Value count
mismatch in batch number " + (batchIndex+1) +""
+ + ".");
+ }
+
+ for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+ if (vv instanceof MapVector) {
+ if
(!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
+ .getAccessor()
+ .getObject(valueIndex))) {
+ fail("Test failed in validating unnest(Map) output. Value
mismatch");
+ }
+ } else if (vv instanceof VarCharVector) {
+ Object val = vv.getAccessor().getObject(valueIndex);
+ if (((String)
baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
+ fail("Test failed in validating unnest output. Value mismatch.
Baseline value[]" + vectorIndex + "][" + valueIndex
+ + "]" + ": " + baseline[vectorIndex][valueIndex] + "
VV.getObject(valueIndex): " + val);
+ }
+ } else {
+ Object val = vv.getAccessor().getObject(valueIndex);
+ if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
+ fail("Test failed in validating unnest output. Value mismatch.
Baseline value[" + vectorIndex + "][" + valueIndex
+ + "]" + ": "
+ +
((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "
VV.getObject(valueIndex): " + val);
+ }
+ }
+ }
+ vectorIndex++;
+ }
+ vectorIndex=0;
+ batchIndex++;
+ }
+ } catch (UserException e) {
+ throw e; // Valid exception
+ } catch (Exception e) {
+ fail("Test failed. Exception : " + e.getMessage());
+ } finally {
+ // Close all the resources for this test case
+ unnestBatch1.close();
+ lateralJoinBatch1.close();
+ unnestBatch2.close();
+ lateralJoinBatch2.close();
+ incomingMockBatch.close();
+
+ if (results != null) {
+ for (List<ValueVector> batch : results) {
+ for (ValueVector vv : batch) {
+ vv.clear();
+ }
+ }
+ }
+ for(RowSet.SingleRowSet rowSet: rowSets) {
+ rowSet.clear();
+ }
+ }
+
+ }
+
+ @Test
+ public void testNestedUnnestMapColumn() {
+
+ Object[][] data = getMapData();
+
+ // Create input schema
+ TupleMetadata incomingSchema = getRepeatedMapSchema();
+ TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+ Object[][][] baseline = getNestedMapBaseline();
+
+ RecordBatch.IterOutcome[] iterOutcomes =
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+ try {
+ testNestedUnnest(incomingSchemas, iterOutcomes, 0, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Fix ignored unit tests in unnest
> --------------------------------
>
> Key: DRILL-6440
> URL: https://issues.apache.org/jira/browse/DRILL-6440
> Project: Apache Drill
> Issue Type: Improvement
> Reporter: Parth Chandra
> Assignee: Parth Chandra
> Priority: Major
> Labels: ready-to-commit
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)