This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b3be79  DRILL-6440: Unnest unit tests and fixes for stats
9b3be79 is described below

commit 9b3be792678afeb5f0c87ba4c4a39afe97f20e98
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Apr 16 16:42:20 2018 -0700

    DRILL-6440: Unnest unit tests and fixes for stats
    
      - Add unit test with mock input, nested lateral and unnest and project.
      - Fix unit test involving batch limits, ignore map tests
      - Fix input row count stats.
    
    closes #1283
---
 .../physical/impl/unnest/UnnestRecordBatch.java    |   8 +-
 .../impl/unnest/TestUnnestCorrectness.java         |   3 +-
 .../unnest/TestUnnestWithLateralCorrectness.java   | 280 +++++++++++++++++++--
 3 files changed, 267 insertions(+), 24 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 57a0ade..e985c4d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -207,6 +207,9 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
       } finally {
         stats.stopSetup();
       }
+      // since we never called next on an upstream operator, incoming stats are
+      // not updated. update input stats explicitly.
+      stats.batchReceived(0, incoming.getRecordCount(), true);
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
       assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
@@ -223,11 +226,13 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
           context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         }
+        stats.batchReceived(0, incoming.getRecordCount(), true);
         return OK_NEW_SCHEMA;
       }
       if (lateral.getRecordIndex() == 0) {
         unnest.resetGroupIndex();
       }
+      stats.batchReceived(0, incoming.getRecordCount(), false);
       return doWork();
     }
 
@@ -348,8 +353,7 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
     recordCount = 0;
     final List<TransferPair> transfers = Lists.newArrayList();
 
-    final FieldReference fieldReference =
-        new FieldReference(popConfig.getColumn());
+    final FieldReference fieldReference = new 
FieldReference(popConfig.getColumn());
 
     final TransferPair transferPair = 
getUnnestFieldTransferPair(fieldReference);
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index 137966b..c04bff7 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -615,7 +615,8 @@ import static org.junit.Assert.assertTrue;
    *    ]
    *  }
    *
-   * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar 
schema and data
+   * @see 
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray 
TestResultSetLoaderMapArray for
+   * similar schema and data
    * @return TupleMetadata corresponding to the schema
    */
   private TupleMetadata getRepeatedMapSchema() {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 9318c51..f281964 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -48,7 +48,6 @@ import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -147,7 +146,6 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
 
   }
 
-  @Ignore("RecordBatchSizer throws Exception in RecordBatchSizer.expandMap")
   @Test
   public void testUnnestMapColumn() {
 
@@ -297,15 +295,21 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
 
   }
 
-  @Ignore ("Batch limits need to be sync'd with tthe record batch sizer. Fix 
once the calulations are stabilized")
   @Test
   public void testUnnestLimitBatchSize() {
 
-    final int limitedOutputBatchSize = 1024;
-    final int inputBatchSize = 1024+1;
-    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
-                                                                               
// int + size of int * num entries in
-                                                                               
// array)
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // size of lateral output batch = 4N * (N + 5) bytes, where N = output 
batch row count
+    //  Lateral output batch size =  N * input row size + N * size of single 
unnest column
+    //                            =  N * (size of row id + size of array 
offset vector + (N + 1 )*size of single array entry))
+    //                              + N * 4
+    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
+    //                            = N * (16 + 4N) + N * 4
+    //                            = 4N * (N + 5)
+    // configure the output batch size to be one more record than that so that 
the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * 
(limitedOutputBatchSize + 6);
+
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -360,11 +364,10 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
 
     // similar to previous test; we split a record across more than one batch.
     // but we also set a limit less than the size of the batch so only one 
batch gets output.
-    final int limitedOutputBatchSize = 1024;
-    final int inputBatchSize = 1024+1;
-    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
-                                                                               
// int + size of int * num entries in
-                                                                               
// array)
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * 
(limitedOutputBatchSize + 6);
+
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -419,11 +422,10 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
 
     // similar to previous test but the size of the array fits exactly into 
the record batch;
 
-    final int limitedOutputBatchSize = 1024;
-    final int inputBatchSize = 1024;
-    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
-                                                                               
// int + size of int * num entries in
-                                                                               
// array)
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * 
(limitedOutputBatchSize + 6);
+
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -682,15 +684,16 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
    *    ]
    *  }
    *
-   * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar 
schema and data
+   * @see 
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray 
TestResultSetLoaderMapArray for
+   * similar schema and data
    * @return TupleMetadata corresponding to the schema
    */
   private TupleMetadata getRepeatedMapSchema() {
     TupleMetadata schema = new SchemaBuilder()
         .add("rowNum", TypeProtos.MinorType.INT)
         .addMapArray("unnestColumn")
-        .add("colA", TypeProtos.MinorType.INT)
-        .addArray("colB", TypeProtos.MinorType.VARCHAR)
+          .add("colA", TypeProtos.MinorType.INT)
+          .addArray("colB", TypeProtos.MinorType.VARCHAR)
         .resumeSchema()
         .buildSchema();
     return schema;
@@ -742,6 +745,30 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
     return d;
   }
 
+  private Object[][][] getNestedMapBaseline() {
+
+    Object[][][] d = {
+        {
+            {2,2,2,2,3,3,3,3,4,4,4,4},
+            {
+                "1.1.1",
+                "1.1.2",
+                "1.2.1",
+                "1.2.2",
+                "2.1.1",
+                "2.1.2",
+                "2.3.1",
+                "2.3.2",
+                "3.1.1",
+                "3.1.2",
+                "3.2.1",
+                "3.2.2"
+            }
+        }
+    };
+    return d;
+  }
+
   private boolean compareMapBaseline(Object baselineValue, Object vector) {
     String vv = vector.toString();
     String b = (String)baselineValue;
@@ -769,5 +796,216 @@ public class TestUnnestWithLateralCorrectness extends 
SubOperatorTest {
         == RecordBatch.IterOutcome.OUT_OF_MEMORY);
   }
 
+
+  /**
+   *     Run a plan like the following for various input batches :
+   *             Lateral1
+   *               /    \
+   *              /    Lateral2
+   *            Scan      / \
+   *                     /   \
+   *                Project1 Project2
+   *                   /       \
+   *                  /         \
+   *              Unnest1      Unnest2
+   *
+   *
+   * @param incomingSchemas
+   * @param iterOutcomes
+   * @param execKill
+   * @param data
+   * @param baseline
+   * @param <T>
+   * @throws Exception
+   */
+
+
+  private <T> void testNestedUnnest( TupleMetadata[] incomingSchemas,
+      RecordBatch.IterOutcome[] iterOutcomes,
+      int execKill, // number of batches after which to kill the execution (!)
+      T[][] data,
+      T[][][] baseline) throws Exception {
+
+    // Get the incoming container with dummy data for LJ
+    final List<VectorContainer> incomingContainer = new 
ArrayList<>(data.length);
+
+    // Create data
+    ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
+    int rowNumber = 0;
+    int batchNum = 0;
+    for ( Object[] recordBatch : data) {
+      RowSetBuilder rowSetBuilder = 
fixture.rowSetBuilder(incomingSchemas[batchNum]);
+      for ( Object rowData : recordBatch) {
+        rowSetBuilder.addRow(++rowNumber, rowData);
+      }
+      RowSet.SingleRowSet rowSet = rowSetBuilder.build();
+      rowSets.add(rowSet);
+      incomingContainer.add(rowSet.container());
+      batchNum++;
+    }
+
+    // Get the unnest POPConfig
+    final UnnestPOP unnestPopConfig1 = new UnnestPOP(null, 
SchemaPath.getSimplePath("unnestColumn"));
+    final UnnestPOP unnestPopConfig2 = new UnnestPOP(null, 
SchemaPath.getSimplePath("colB"));
+
+    // Get the IterOutcomes for LJ
+    final List<RecordBatch.IterOutcome> outcomes = new 
ArrayList<>(iterOutcomes.length);
+    for(RecordBatch.IterOutcome o : iterOutcomes) {
+      outcomes.add(o);
+    }
+
+    // Create incoming MockRecordBatch
+    final MockRecordBatch incomingMockBatch =
+        new MockRecordBatch(fixture.getFragmentContext(), operatorContext, 
incomingContainer, outcomes,
+            incomingContainer.get(0).getSchema());
+
+    // setup Unnest record batch
+    final UnnestRecordBatch unnestBatch1 =
+        new UnnestRecordBatch(unnestPopConfig1, fixture.getFragmentContext());
+    final UnnestRecordBatch unnestBatch2 =
+        new UnnestRecordBatch(unnestPopConfig2, fixture.getFragmentContext());
+
+    // Create intermediate Project
+    final Project projectPopConfig1 =
+        new Project(DrillLogicalTestutils.parseExprs("unnestColumn.colB", 
"colB"), unnestPopConfig1);
+    final ProjectRecordBatch projectBatch1 =
+        new ProjectRecordBatch(projectPopConfig1, unnestBatch1, 
fixture.getFragmentContext());
+    final Project projectPopConfig2 =
+        new Project(DrillLogicalTestutils.parseExprs("colB", "unnestColumn2"), 
unnestPopConfig2);
+    final ProjectRecordBatch projectBatch2 =
+        new ProjectRecordBatch(projectPopConfig2, unnestBatch2, 
fixture.getFragmentContext());
+
+    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, 
projectPopConfig2, JoinRelType.FULL);
+    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, 
ljPopConfig2, JoinRelType.FULL);
+
+    final LateralJoinBatch lateralJoinBatch2 =
+        new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), 
projectBatch1, projectBatch2);
+    final LateralJoinBatch lateralJoinBatch1 =
+        new LateralJoinBatch(ljPopConfig1, fixture.getFragmentContext(), 
incomingMockBatch, lateralJoinBatch2);
+
+    // set pointer to Lateral in unnest
+    unnestBatch1.setIncoming((LateralContract) lateralJoinBatch1);
+    unnestBatch2.setIncoming((LateralContract) lateralJoinBatch2);
+
+    // Simulate the pipeline by calling next on the incoming
+
+    // results is an array ot batches, each batch being an array of output 
vectors.
+    List<List<ValueVector> > resultList = new ArrayList<>();
+    List<List<ValueVector> > results = null;
+    int batchesProcessed = 0;
+    try{
+      try {
+        while (!isTerminal(lateralJoinBatch1.next())) {
+          if (lateralJoinBatch1.getRecordCount() > 0) {
+            addBatchToResults(resultList, lateralJoinBatch1);
+          }
+          batchesProcessed++;
+          if (batchesProcessed == execKill) {
+            lateralJoinBatch1.getContext().getExecutorState().fail(new 
DrillException("Testing failure of execution."));
+            lateralJoinBatch1.kill(true);
+          }
+          // else nothing to do
+        }
+      } catch (UserException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new Exception ("Test failed to execute lateralJoinBatch.next() 
because: " + e.getMessage());
+      }
+
+      // Check results against baseline
+      results = resultList;
+
+      int batchIndex = 0;
+      int vectorIndex = 0;
+      //int valueIndex = 0;
+      for ( List<ValueVector> batch: results) {
+        int vectorCount= batch.size();
+        if (vectorCount!= baseline[batchIndex].length+2) { // baseline does 
not include the original unnest column(s)
+          fail("Test failed in validating unnest output. Batch column count 
mismatch.");
+        }
+        for (ValueVector vv : batch) {
+          if(vv.getField().getName().equals("unnestColumn") || 
vv.getField().getName().equals("colB")) {
+            continue; // skip the original input column
+          }
+          int valueCount = vv.getAccessor().getValueCount();
+          if (valueCount!= baseline[batchIndex][vectorIndex].length) {
+            fail("Test failed in validating unnest output. Value count 
mismatch in batch number " + (batchIndex+1) +""
+                + ".");
+          }
+
+          for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+            if (vv instanceof MapVector) {
+              if 
(!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
+                  .getAccessor()
+                  .getObject(valueIndex))) {
+                fail("Test failed in validating unnest(Map) output. Value 
mismatch");
+              }
+            } else if (vv instanceof VarCharVector) {
+              Object val = vv.getAccessor().getObject(valueIndex);
+              if (((String) 
baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
+                fail("Test failed in validating unnest output. Value mismatch. 
Baseline value[]" + vectorIndex + "][" + valueIndex
+                    + "]" + ": " + baseline[vectorIndex][valueIndex] + "   
VV.getObject(valueIndex): " + val);
+              }
+            } else {
+              Object val = vv.getAccessor().getObject(valueIndex);
+              if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
+                fail("Test failed in validating unnest output. Value mismatch. 
Baseline value[" + vectorIndex + "][" + valueIndex
+                    + "]" + ": "
+                    + 
((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "   
VV.getObject(valueIndex): " + val);
+              }
+            }
+          }
+          vectorIndex++;
+        }
+        vectorIndex=0;
+        batchIndex++;
+      }
+    } catch (UserException e) {
+      throw e; // Valid exception
+    } catch (Exception e) {
+      fail("Test failed. Exception : " + e.getMessage());
+    } finally {
+      // Close all the resources for this test case
+      unnestBatch1.close();
+      lateralJoinBatch1.close();
+      unnestBatch2.close();
+      lateralJoinBatch2.close();
+      incomingMockBatch.close();
+
+      if (results != null) {
+        for (List<ValueVector> batch : results) {
+          for (ValueVector vv : batch) {
+            vv.clear();
+          }
+        }
+      }
+      for(RowSet.SingleRowSet rowSet: rowSets) {
+        rowSet.clear();
+      }
+    }
+
+  }
+
+  @Test
+  public void testNestedUnnestMapColumn() {
+
+    Object[][] data = getMapData();
+
+    // Create input schema
+    TupleMetadata incomingSchema = getRepeatedMapSchema();
+    TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+    Object[][][] baseline = getNestedMapBaseline();
+
+    RecordBatch.IterOutcome[] iterOutcomes = 
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+    try {
+      testNestedUnnest(incomingSchemas, iterOutcomes, 0, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
 }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to