[ 
https://issues.apache.org/jira/browse/DRILL-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488033#comment-16488033
 ] 

ASF GitHub Bot commented on DRILL-6440:
---------------------------------------

sohami closed pull request #1283: DRILL-6440: Unnest unit tests and fixes for 
stats
URL: https://github.com/apache/drill/pull/1283
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 57a0adeb7c..e985c4defe 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -207,6 +207,9 @@ public IterOutcome innerNext() {
       } finally {
         stats.stopSetup();
       }
+      // since we never called next on an upstream operator, incoming stats are
+      // not updated. update input stats explicitly.
+      stats.batchReceived(0, incoming.getRecordCount(), true);
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
       assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
@@ -223,11 +226,13 @@ public IterOutcome innerNext() {
           context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         }
+        stats.batchReceived(0, incoming.getRecordCount(), true);
         return OK_NEW_SCHEMA;
       }
       if (lateral.getRecordIndex() == 0) {
         unnest.resetGroupIndex();
       }
+      stats.batchReceived(0, incoming.getRecordCount(), false);
       return doWork();
     }
 
@@ -348,8 +353,7 @@ protected IterOutcome doWork() {
     recordCount = 0;
     final List<TransferPair> transfers = Lists.newArrayList();
 
-    final FieldReference fieldReference =
-        new FieldReference(popConfig.getColumn());
+    final FieldReference fieldReference = new 
FieldReference(popConfig.getColumn());
 
     final TransferPair transferPair = 
getUnnestFieldTransferPair(fieldReference);
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index 137966ba33..c04bff7753 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -615,7 +615,8 @@ public void testUnnestNonArrayColumn() {
    *    ]
    *  }
    *
-   * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar 
schema and data
+   * @see 
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray 
TestResultSetLoaderMapArray for
+   * similar schema and data
    * @return TupleMetadata corresponding to the schema
    */
   private TupleMetadata getRepeatedMapSchema() {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 9318c516b9..f281964fb4 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -48,7 +48,6 @@
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -147,7 +146,6 @@ public void testUnnestVarWidthColumn() {
 
   }
 
-  @Ignore("RecordBatchSizer throws Exception in RecordBatchSizer.expandMap")
   @Test
   public void testUnnestMapColumn() {
 
@@ -297,15 +295,21 @@ public void testUnnestSchemaChange() {
 
   }
 
-  @Ignore ("Batch limits need to be sync'd with tthe record batch sizer. Fix 
once the calulations are stabilized")
   @Test
   public void testUnnestLimitBatchSize() {
 
-    final int limitedOutputBatchSize = 1024;
-    final int inputBatchSize = 1024+1;
-    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
-                                                                               
// int + size of int * num entries in
-                                                                               
// array)
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // size of lateral output batch = 4N * (N + 5) bytes, where N = output 
batch row count
+    //  Lateral output batch size =  N * input row size + N * size of single 
unnest column
+    //                            =  N * (size of row id + size of array 
offset vector + (N + 1 )*size of single array entry))
+    //                              + N * 4
+    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
+    //                            = N * (16 + 4N) + N * 4
+    //                            = 4N * (N + 5)
+    // configure the output batch size to be one more record than that so that 
the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * 
(limitedOutputBatchSize + 6);
+
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -360,11 +364,10 @@ public void testUnnestKillFromLimitSubquery1() {
 
     // similar to previous test; we split a record across more than one batch.
     // but we also set a limit less than the size of the batch so only one 
batch gets output.
-    final int limitedOutputBatchSize = 1024;
-    final int inputBatchSize = 1024+1;
-    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
-                                                                               
// int + size of int * num entries in
-                                                                               
// array)
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * 
(limitedOutputBatchSize + 6);
+
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -419,11 +422,10 @@ public void testUnnestKillFromLimitSubquery2() {
 
     // similar to previous test but the size of the array fits exactly into 
the record batch;
 
-    final int limitedOutputBatchSize = 1024;
-    final int inputBatchSize = 1024;
-    final int limitedOutputBatchSizeBytes = 1024*(4 + 4 + 4 * inputBatchSize); 
// num rows * (size of int + size of
-                                                                               
// int + size of int * num entries in
-                                                                               
// array)
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * 
(limitedOutputBatchSize + 6);
+
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -682,15 +684,16 @@ public void testUnnestNonArrayColumn() {
    *    ]
    *  }
    *
-   * @see TestResultSetLoaderMapArray TestResultSetLoaderMapArray for similar 
schema and data
+   * @see 
org.apache.drill.exec.physical.rowSet.impl.TestResultSetLoaderMapArray 
TestResultSetLoaderMapArray for
+   * similar schema and data
    * @return TupleMetadata corresponding to the schema
    */
   private TupleMetadata getRepeatedMapSchema() {
     TupleMetadata schema = new SchemaBuilder()
         .add("rowNum", TypeProtos.MinorType.INT)
         .addMapArray("unnestColumn")
-        .add("colA", TypeProtos.MinorType.INT)
-        .addArray("colB", TypeProtos.MinorType.VARCHAR)
+          .add("colA", TypeProtos.MinorType.INT)
+          .addArray("colB", TypeProtos.MinorType.VARCHAR)
         .resumeSchema()
         .buildSchema();
     return schema;
@@ -742,6 +745,30 @@ private TupleMetadata getRepeatedMapSchema() {
     return d;
   }
 
+  private Object[][][] getNestedMapBaseline() {
+
+    Object[][][] d = {
+        {
+            {2,2,2,2,3,3,3,3,4,4,4,4},
+            {
+                "1.1.1",
+                "1.1.2",
+                "1.2.1",
+                "1.2.2",
+                "2.1.1",
+                "2.1.2",
+                "2.3.1",
+                "2.3.2",
+                "3.1.1",
+                "3.1.2",
+                "3.2.1",
+                "3.2.2"
+            }
+        }
+    };
+    return d;
+  }
+
   private boolean compareMapBaseline(Object baselineValue, Object vector) {
     String vv = vector.toString();
     String b = (String)baselineValue;
@@ -769,5 +796,216 @@ private boolean isTerminal(RecordBatch.IterOutcome 
outcome) {
         == RecordBatch.IterOutcome.OUT_OF_MEMORY);
   }
 
+
+  /**
+   *     Run a plan like the following for various input batches :
+   *             Lateral1
+   *               /    \
+   *              /    Lateral2
+   *            Scan      / \
+   *                     /   \
+   *                Project1 Project2
+   *                   /       \
+   *                  /         \
+   *              Unnest1      Unnest2
+   *
+   *
+   * @param incomingSchemas
+   * @param iterOutcomes
+   * @param execKill
+   * @param data
+   * @param baseline
+   * @param <T>
+   * @throws Exception
+   */
+
+
+  private <T> void testNestedUnnest( TupleMetadata[] incomingSchemas,
+      RecordBatch.IterOutcome[] iterOutcomes,
+      int execKill, // number of batches after which to kill the execution (!)
+      T[][] data,
+      T[][][] baseline) throws Exception {
+
+    // Get the incoming container with dummy data for LJ
+    final List<VectorContainer> incomingContainer = new 
ArrayList<>(data.length);
+
+    // Create data
+    ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
+    int rowNumber = 0;
+    int batchNum = 0;
+    for ( Object[] recordBatch : data) {
+      RowSetBuilder rowSetBuilder = 
fixture.rowSetBuilder(incomingSchemas[batchNum]);
+      for ( Object rowData : recordBatch) {
+        rowSetBuilder.addRow(++rowNumber, rowData);
+      }
+      RowSet.SingleRowSet rowSet = rowSetBuilder.build();
+      rowSets.add(rowSet);
+      incomingContainer.add(rowSet.container());
+      batchNum++;
+    }
+
+    // Get the unnest POPConfig
+    final UnnestPOP unnestPopConfig1 = new UnnestPOP(null, 
SchemaPath.getSimplePath("unnestColumn"));
+    final UnnestPOP unnestPopConfig2 = new UnnestPOP(null, 
SchemaPath.getSimplePath("colB"));
+
+    // Get the IterOutcomes for LJ
+    final List<RecordBatch.IterOutcome> outcomes = new 
ArrayList<>(iterOutcomes.length);
+    for(RecordBatch.IterOutcome o : iterOutcomes) {
+      outcomes.add(o);
+    }
+
+    // Create incoming MockRecordBatch
+    final MockRecordBatch incomingMockBatch =
+        new MockRecordBatch(fixture.getFragmentContext(), operatorContext, 
incomingContainer, outcomes,
+            incomingContainer.get(0).getSchema());
+
+    // setup Unnest record batch
+    final UnnestRecordBatch unnestBatch1 =
+        new UnnestRecordBatch(unnestPopConfig1, fixture.getFragmentContext());
+    final UnnestRecordBatch unnestBatch2 =
+        new UnnestRecordBatch(unnestPopConfig2, fixture.getFragmentContext());
+
+    // Create intermediate Project
+    final Project projectPopConfig1 =
+        new Project(DrillLogicalTestutils.parseExprs("unnestColumn.colB", 
"colB"), unnestPopConfig1);
+    final ProjectRecordBatch projectBatch1 =
+        new ProjectRecordBatch(projectPopConfig1, unnestBatch1, 
fixture.getFragmentContext());
+    final Project projectPopConfig2 =
+        new Project(DrillLogicalTestutils.parseExprs("colB", "unnestColumn2"), 
unnestPopConfig2);
+    final ProjectRecordBatch projectBatch2 =
+        new ProjectRecordBatch(projectPopConfig2, unnestBatch2, 
fixture.getFragmentContext());
+
+    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, 
projectPopConfig2, JoinRelType.FULL);
+    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, 
ljPopConfig2, JoinRelType.FULL);
+
+    final LateralJoinBatch lateralJoinBatch2 =
+        new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), 
projectBatch1, projectBatch2);
+    final LateralJoinBatch lateralJoinBatch1 =
+        new LateralJoinBatch(ljPopConfig1, fixture.getFragmentContext(), 
incomingMockBatch, lateralJoinBatch2);
+
+    // set pointer to Lateral in unnest
+    unnestBatch1.setIncoming((LateralContract) lateralJoinBatch1);
+    unnestBatch2.setIncoming((LateralContract) lateralJoinBatch2);
+
+    // Simulate the pipeline by calling next on the incoming
+
+    // results is an array ot batches, each batch being an array of output 
vectors.
+    List<List<ValueVector> > resultList = new ArrayList<>();
+    List<List<ValueVector> > results = null;
+    int batchesProcessed = 0;
+    try{
+      try {
+        while (!isTerminal(lateralJoinBatch1.next())) {
+          if (lateralJoinBatch1.getRecordCount() > 0) {
+            addBatchToResults(resultList, lateralJoinBatch1);
+          }
+          batchesProcessed++;
+          if (batchesProcessed == execKill) {
+            lateralJoinBatch1.getContext().getExecutorState().fail(new 
DrillException("Testing failure of execution."));
+            lateralJoinBatch1.kill(true);
+          }
+          // else nothing to do
+        }
+      } catch (UserException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new Exception ("Test failed to execute lateralJoinBatch.next() 
because: " + e.getMessage());
+      }
+
+      // Check results against baseline
+      results = resultList;
+
+      int batchIndex = 0;
+      int vectorIndex = 0;
+      //int valueIndex = 0;
+      for ( List<ValueVector> batch: results) {
+        int vectorCount= batch.size();
+        if (vectorCount!= baseline[batchIndex].length+2) { // baseline does 
not include the original unnest column(s)
+          fail("Test failed in validating unnest output. Batch column count 
mismatch.");
+        }
+        for (ValueVector vv : batch) {
+          if(vv.getField().getName().equals("unnestColumn") || 
vv.getField().getName().equals("colB")) {
+            continue; // skip the original input column
+          }
+          int valueCount = vv.getAccessor().getValueCount();
+          if (valueCount!= baseline[batchIndex][vectorIndex].length) {
+            fail("Test failed in validating unnest output. Value count 
mismatch in batch number " + (batchIndex+1) +""
+                + ".");
+          }
+
+          for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+            if (vv instanceof MapVector) {
+              if 
(!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
+                  .getAccessor()
+                  .getObject(valueIndex))) {
+                fail("Test failed in validating unnest(Map) output. Value 
mismatch");
+              }
+            } else if (vv instanceof VarCharVector) {
+              Object val = vv.getAccessor().getObject(valueIndex);
+              if (((String) 
baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
+                fail("Test failed in validating unnest output. Value mismatch. 
Baseline value[]" + vectorIndex + "][" + valueIndex
+                    + "]" + ": " + baseline[vectorIndex][valueIndex] + "   
VV.getObject(valueIndex): " + val);
+              }
+            } else {
+              Object val = vv.getAccessor().getObject(valueIndex);
+              if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
+                fail("Test failed in validating unnest output. Value mismatch. 
Baseline value[" + vectorIndex + "][" + valueIndex
+                    + "]" + ": "
+                    + 
((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "   
VV.getObject(valueIndex): " + val);
+              }
+            }
+          }
+          vectorIndex++;
+        }
+        vectorIndex=0;
+        batchIndex++;
+      }
+    } catch (UserException e) {
+      throw e; // Valid exception
+    } catch (Exception e) {
+      fail("Test failed. Exception : " + e.getMessage());
+    } finally {
+      // Close all the resources for this test case
+      unnestBatch1.close();
+      lateralJoinBatch1.close();
+      unnestBatch2.close();
+      lateralJoinBatch2.close();
+      incomingMockBatch.close();
+
+      if (results != null) {
+        for (List<ValueVector> batch : results) {
+          for (ValueVector vv : batch) {
+            vv.clear();
+          }
+        }
+      }
+      for(RowSet.SingleRowSet rowSet: rowSets) {
+        rowSet.clear();
+      }
+    }
+
+  }
+
+  @Test
+  public void testNestedUnnestMapColumn() {
+
+    Object[][] data = getMapData();
+
+    // Create input schema
+    TupleMetadata incomingSchema = getRepeatedMapSchema();
+    TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
+
+    Object[][][] baseline = getNestedMapBaseline();
+
+    RecordBatch.IterOutcome[] iterOutcomes = 
{RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
+
+    try {
+      testNestedUnnest(incomingSchemas, iterOutcomes, 0, data, baseline);
+    } catch (Exception e) {
+      fail("Failed due to exception: " + e.getMessage());
+    }
+
+  }
+
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Fix ignored unit tests in unnest
> --------------------------------
>
>                 Key: DRILL-6440
>                 URL: https://issues.apache.org/jira/browse/DRILL-6440
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>            Priority: Major
>              Labels: ready-to-commit
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to