This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 9b3be79 DRILL-6440: Unnest unit tests and fixes for stats
9b3be79 is described below
commit 9b3be792678afeb5f0c87ba4c4a39afe97f20e98
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Apr 16 16:42:20 2018 -0700
DRILL-6440: Unnest unit tests and fixes for stats
- Add unit test with mock input, nested lateral and unnest and project.
- Fix unit test involving batch limits, ignore map tests
- Fix input row count stats.
closes #1283
---
.../physical/impl/unnest/UnnestRecordBatch.java | 8 +-
.../impl/unnest/TestUnnestCorrectness.java | 3 +-
.../unnest/TestUnnestWithLateralCorrectness.java | 280 +++++++++++++++++++--
3 files changed, 267 insertions(+), 24 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 57a0ade..e985c4d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -207,6 +207,9 @@ public class UnnestRecordBatch extends
AbstractTableFunctionRecordBatch<UnnestPO
} finally {
stats.stopSetup();
}
+ // since we never called next on an upstream operator, incoming stats are
+ // not updated. update input stats explicitly.
+ stats.batchReceived(0, incoming.getRecordCount(), true);
return IterOutcome.OK_NEW_SCHEMA;
} else {
assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
@@ -223,11 +226,13 @@ public class UnnestRecordBatch extends
AbstractTableFunctionRecordBatch<UnnestPO
context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
+ stats.batchReceived(0, incoming.getRecordCount(), true);
return OK_NEW_SCHEMA;
}
if (lateral.getRecordIndex() == 0) {
unnest.resetGroupIndex();
}
+ stats.batchReceived(0, incoming.getRecordCount(), false);
return doWork();
}
@@ -348,8 +353,7 @@ public class UnnestRecordBatch extends
AbstractTableFunctionRecordBatch<UnnestPO
recordCount = 0;
final List<TransferPair> transfers = Lists.newArrayList();
- final FieldReference fieldReference =
- new FieldReference(popConfig.getColumn());
+ final FieldReference fieldReference = new
FieldReference(popConfig.getColumn());
final TransferPair transferPair =
getUnnestFieldTransferPair(fieldReference);
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index 137966b..c04bff7 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -615,7 +615,8 @@ import static org.junit.Assert.assertTrue;
* ]
* }
*
- * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar
schema and data
+ * @see
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray
TestResultSetLoaderMapArray for
+ * similar schema and data
* @return TupleMetadata corresponding to the schema
*/
private TupleMetadata getRepeatedMapSchema() {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 9318c51..f281964 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -48,7 +48,6 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -147,7 +146,6 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
}
- @Ignore("RecordBatchSizer throws Exception in RecordBatchSizer.expandMap")
@Test
public void testUnnestMapColumn() {
@@ -297,15 +295,21 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
}
- @Ignore ("Batch limits need to be sync'd with tthe record batch sizer. Fix
once the calulations are stabilized")
@Test
public void testUnnestLimitBatchSize() {
- final int limitedOutputBatchSize = 1024;
- final int inputBatchSize = 1024+1;
- final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
-
// int + size of int * num entries in
-
// array)
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ // size of lateral output batch = 4N * (N + 5) bytes, where N = output
batch row count
+ // Lateral output batch size = N * input row size + N * size of single
unnest column
+ // = N * (size of row id + size of array
offset vector + (N + 1 )*size of single array entry))
+ // + N * 4
+ // = N * (4 + 2*4 + (N+1)*4 ) + N * 4
+ // = N * (16 + 4N) + N * 4
+ // = 4N * (N + 5)
+ // configure the output batch size to be one more record than that so that
the batch sizer can round down
+ final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize *
(limitedOutputBatchSize + 6);
+
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
@@ -360,11 +364,10 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
// similar to previous test; we split a record across more than one batch.
// but we also set a limit less than the size of the batch so only one
batch gets output.
- final int limitedOutputBatchSize = 1024;
- final int inputBatchSize = 1024+1;
- final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
-
// int + size of int * num entries in
-
// array)
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize *
(limitedOutputBatchSize + 6);
+
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
@@ -419,11 +422,10 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
// similar to previous test but the size of the array fits exactly into
the record batch;
- final int limitedOutputBatchSize = 1024;
- final int inputBatchSize = 1024;
- final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize);
// num rows * (size of int + size of
-
// int + size of int * num entries in
-
// array)
+ final int limitedOutputBatchSize = 127;
+ final int inputBatchSize = limitedOutputBatchSize + 1;
+ final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize *
(limitedOutputBatchSize + 6);
+
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
@@ -682,15 +684,16 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
* ]
* }
*
- * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar
schema and data
+ * @see
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray
TestResultSetLoaderMapArray for
+ * similar schema and data
* @return TupleMetadata corresponding to the schema
*/
private TupleMetadata getRepeatedMapSchema() {
TupleMetadata schema = new SchemaBuilder()
.add("rowNum", TypeProtos.MinorType.INT)
.addMapArray("unnestColumn")
- .add("colA", TypeProtos.MinorType.INT)
- .addArray("colB", TypeProtos.MinorType.VARCHAR)
+ .add("colA", TypeProtos.MinorType.INT)
+ .addArray("colB", TypeProtos.MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
return schema;
@@ -742,6 +745,30 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
return d;
}
+ private Object[][][] getNestedMapBaseline() {
+
+ Object[][][] d = {
+ {
+ {2,2,2,2,3,3,3,3,4,4,4,4},
+ {
+ "1.1.1",
+ "1.1.2",
+ "1.2.1",
+ "1.2.2",
+ "2.1.1",
+ "2.1.2",
+ "2.3.1",
+ "2.3.2",
+ "3.1.1",
+ "3.1.2",
+ "3.2.1",
+ "3.2.2"
+ }
+ }
+ };
+ return d;
+ }
+
private boolean compareMapBaseline(Object baselineValue, Object vector) {
String vv = vector.toString();
String b = (String)baselineValue;
@@ -769,5 +796,216 @@ public class TestUnnestWithLateralCorrectness extends
SubOperatorTest {
== RecordBatch.IterOutcome.OUT_OF_MEMORY);
}
+
+ /**
+ * Run a plan like the following for various input batches :
+ * Lateral1
+ * / \
+ * / Lateral2
+ * Scan / \
+ * / \
+ * Project1 Project2
+ * / \
+ * / \
+ * Unnest1 Unnest2
+ *
+ *
+ * @param incomingSchemas
+ * @param iterOutcomes
+ * @param execKill
+ * @param data
+ * @param baseline
+ * @param <T>
+ * @throws Exception
+ */
+
+
+ private <T> void testNestedUnnest( TupleMetadata[] incomingSchemas,
+ RecordBatch.IterOutcome[] iterOutcomes,
+ int execKill, // number of batches after which to kill the execution (!)
+ T[][] data,
+ T[][][] baseline) throws Exception {
+
+ // Get the incoming container with dummy data for LJ
+ final List<VectorContainer> incomingContainer = new
ArrayList<>(data.length);
+
+ // Create data
+ ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
+ int rowNumber = 0;
+ int batchNum = 0;
+ for ( Object[] recordBatch : data) {
+ RowSetBuilder rowSetBuilder =
fixture.rowSetBuilder(incomingSchemas[batchNum]);
+ for ( Object rowData : recordBatch) {
+ rowSetBuilder.addRow(++rowNumber, rowData);
+ }
+ RowSet.SingleRowSet rowSet = rowSetBuilder.build();
+ rowSets.add(rowSet);
+ incomingContainer.add(rowSet.container());
+ batchNum++;
+ }
+
+ // Get the unnest POPConfig
+ final UnnestPOP unnestPopConfig1 = new UnnestPOP(null,
SchemaPath.getSimplePath("unnestColumn"));
+ final UnnestPOP unnestPopConfig2 = new UnnestPOP(null,
SchemaPath.getSimplePath("colB"));
+
+ // Get the IterOutcomes for LJ
+ final List<RecordBatch.IterOutcome> outcomes = new
ArrayList<>(iterOutcomes.length);
+ for(RecordBatch.IterOutcome o : iterOutcomes) {
+ outcomes.add(o);
+ }
+
+ // Create incoming MockRecordBatch
+ final MockRecordBatch incomingMockBatch =
+ new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
incomingContainer, outcomes,
+ incomingContainer.get(0).getSchema());
+
+ // setup Unnest record batch
+ final UnnestRecordBatch unnestBatch1 =
+ new UnnestRecordBatch(unnestPopConfig1, fixture.getFragmentContext());
+ final UnnestRecordBatch unnestBatch2 =
+ new UnnestRecordBatch(unnestPopConfig2, fixture.getFragmentContext());
+
+ // Create intermediate Project
+ final Project projectPopConfig1 =
+ new Project(DrillLogicalTestutils.parseExprs("unnestColumn.colB",
"colB"), unnestPopConfig1);
+ final ProjectRecordBatch projectBatch1 =
+ new ProjectRecordBatch(projectPopConfig1, unnestBatch1,
fixture.getFragmentContext());
+ final Project projectPopConfig2 =
+ new Project(DrillLogicalTestutils.parseExprs("colB", "unnestColumn2"),
unnestPopConfig2);
+ final ProjectRecordBatch projectBatch2 =
+ new ProjectRecordBatch(projectPopConfig2, unnestBatch2,
fixture.getFragmentContext());
+
+ final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1,
projectPopConfig2, JoinRelType.FULL);
+ final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig,
ljPopConfig2, JoinRelType.FULL);
+
+ final LateralJoinBatch lateralJoinBatch2 =
+ new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(),
projectBatch1, projectBatch2);
+ final LateralJoinBatch lateralJoinBatch1 =
+ new LateralJoinBatch(ljPopConfig1, fixture.getFragmentContext(),
incomingMockBatch, lateralJoinBatch2);
+
+ // set pointer to Lateral in unnest
+ unnestBatch1.setIncoming((LateralContract) lateralJoinBatch1);
+ unnestBatch2.setIncoming((LateralContract) lateralJoinBatch2);
+
+ // Simulate the pipeline by calling next on the incoming
+
+ // results is an array ot batches, each batch being an array of output
vectors.
+ List<List<ValueVector> > resultList = new ArrayList<>();
+ List<List<ValueVector> > results = null;
+ int batchesProcessed = 0;
+ try{
+ try {
+ while (!isTerminal(lateralJoinBatch1.next())) {
+ if (lateralJoinBatch1.getRecordCount() > 0) {
+ addBatchToResults(resultList, lateralJoinBatch1);
+ }
+ batchesProcessed++;
+ if (batchesProcessed == execKill) {
+ lateralJoinBatch1.getContext().getExecutorState().fail(new
DrillException("Testing failure of execution."));
+ lateralJoinBatch1.kill(true);
+ }
+ // else nothing to do
+ }
+ } catch (UserException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new Exception ("Test failed to execute lateralJoinBatch.next()
because: " + e.getMessage());
+ }
+
+ // Check results against baseline
+ results = resultList;
+
+ int batchIndex = 0;
+ int vectorIndex = 0;
+ //int valueIndex = 0;
+ for ( List<ValueVector> batch: results) {
+ int vectorCount= batch.size();
+ if (vectorCount!= baseline[batchIndex].length+2) { // baseline does
not include the original unnest column(s)
+ fail("Test failed in validating unnest output. Batch column count
mismatch.");
+ }
+ for (ValueVector vv : batch) {
+ if(vv.getField().getName().equals("unnestColumn") ||
vv.getField().getName().equals("colB")) {
+ continue; // skip the original input column
+ }
+ int valueCount = vv.getAccessor().getValueCount();
+ if (valueCount!= baseline[batchIndex][vectorIndex].length) {
+ fail("Test failed in validating unnest output. Value count
mismatch in batch number " + (batchIndex+1) +""
+ + ".");
+ }
+
+ for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+ if (vv instanceof MapVector) {
+ if
(!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
+ .getAccessor()
+ .getObject(valueIndex))) {
+ fail("Test failed in validating unnest(Map) output. Value
mismatch");
+ }
+ } else if (vv instanceof VarCharVector) {
+ Object val = vv.getAccessor().getObject(valueIndex);
+ if (((String)
baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
+ fail("Test failed in validating unnest output. Value mismatch.
Baseline value[]" + vectorIndex + "][" + valueIndex
+ + "]" + ": " + baseline[vectorIndex][valueIndex] + "
VV.getObject(valueIndex): " + val);
+ }
+ } else {
+ Object val = vv.getAccessor().getObject(valueIndex);
+ if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
+ fail("Test failed in validating unnest output. Value mismatch.
Baseline value[" + vectorIndex + "][" + valueIndex
+ + "]" + ": "
+ +
((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "
VV.getObject(valueIndex): " + val);
+ }
+ }
+ }
+ vectorIndex++;
+ }
+ vectorIndex=0;
+ batchIndex++;
+ }
+ } catch (UserException e) {
+ throw e; // Valid exception
+ } catch (Exception e) {
+ fail("Test failed. Exception : " + e.getMessage());
+ } finally {
+ // Close all the resources for this test case
+ unnestBatch1.close();
+ lateralJoinBatch1.close();
+ unnestBatch2.close();
+ lateralJoinBatch2.close();
+ incomingMockBatch.close();
+
+ if (results != null) {
+ for (List<ValueVector> batch : results) {
+ for (ValueVector vv : batch) {
+ vv.clear();
+ }
+ }
+ }
+ for(RowSet.SingleRowSet rowSet: rowSets) {
+ rowSet.clear();
+ }
+ }
+
+ }
+
+ @Test
+ public void testNestedUnnestMapColumn() {
+
+ Object[][] data = getMapData();
+
+ // Create input schema
+ TupleMetadata incomingSchema = getRepeatedMapSchema();
+ TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+ Object[][][] baseline = getNestedMapBaseline();
+
+ RecordBatch.IterOutcome[] iterOutcomes =
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+ try {
+ testNestedUnnest(incomingSchemas, iterOutcomes, 0, data, baseline);
+ } catch (Exception e) {
+ fail("Failed due to exception: " + e.getMessage());
+ }
+
+ }
+
}
--
To stop receiving notification emails like this one, please contact
[email protected].