DRILL-827: Fix bug in reading dictionary encoded columns in parquet.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/10127846
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/10127846
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/10127846

Branch: refs/heads/master
Commit: 1012784682a3e9ed27076bfa33b8c4cb2361a542
Parents: 602f817
Author: Jason Altekruse <[email protected]>
Authored: Fri May 23 11:14:52 2014 -0500
Committer: Jacques Nadeau <[email protected]>
Committed: Wed May 28 15:13:00 2014 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/PageReadStatus.java      |  2 +-
 .../exec/store/parquet/ParquetRecordReader.java |  2 +-
 .../exec/store/parquet/VarLenBinaryReader.java  |  7 ++++-
 .../store/parquet/ParquetRecordReaderTest.java  | 27 +++++++++++++++-----
 .../store/parquet/ParquetResultListener.java    | 16 +++++++-----
 5 files changed, 37 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index e4081d9..ba98f3c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -167,10 +167,10 @@ final class PageReadStatus {
       parentColumnReader.currDefLevel = -1;
       if (!currentPage.getValueEncoding().usesDictionary()) {
         definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
-        valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
         definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, 0);
         readPosInBytes = definitionLevels.getNextOffset();
         if (parentColumnReader.columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+          valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
           valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 0996620..6754855 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -248,7 +248,7 @@ public class ParquetRecordReader implements RecordReader {
       boolean fieldFixedLength = false;
       for (int i = 0; i < columns.size(); ++i) {
         column = columns.get(i);
-        columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
+        columnChunkMetaData = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
         schemaElement = schemaElements.get(column.getPath()[0]);
         convertedType = schemaElement.getConverted_type();
         MajorType type = toMajorType(column.getType(), 
schemaElement.getType_length(), getDataMode(column), schemaElement);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index 4efcdaf..2575c4d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -100,6 +100,8 @@ public class VarLenBinaryReader {
           if (!columnReader.pageReadStatus.next()) {
             rowGroupFinished = true;
             break;
+          } else {
+            columnReader.currDictVal = null;
           }
         }
         bytes = columnReader.pageReadStatus.pageDataByteArray;
@@ -118,7 +120,9 @@ public class VarLenBinaryReader {
         }
 
         if (columnReader.usingDictionary) {
-          columnReader.currDictVal = 
columnReader.pageReadStatus.valueReader.readBytes();
+          if (columnReader.currDictVal == null) {
+            columnReader.currDictVal = 
columnReader.pageReadStatus.valueReader.readBytes();
+          }
           // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
           columnReader.dataTypeLengthInBits = 
columnReader.currDictVal.length();
         }
@@ -169,6 +173,7 @@ public class VarLenBinaryReader {
           columnReader.totalValuesRead += 
columnReader.pageReadStatus.valuesRead;
           columnReader.pageReadStatus.next();
         }
+        columnReader.currDictVal = null;
       }
       recordsReadInCurrentPass++;
     } while (recordsReadInCurrentPass < recordsToReadInThisPass);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 222508c..82436a3 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -117,7 +117,17 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
         readEntries += ",";
     }
     String planText = 
Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"),
 Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
-    testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, 
recordsPerRowGroup);
+    testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, 
recordsPerRowGroup, true);
+  }
+
+  @Test
+  @Ignore
+  public void testDictionaryError() throws Exception {
+    String readEntries;
+    readEntries = "\"/tmp/lineitem_null_dict.parquet\"";
+
+    String planText = 
Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"),
 Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
+    testParquetFullEngineLocalText(planText, fileName, 1, 1, 100000, false);
   }
 
   @Test
@@ -135,21 +145,24 @@ public class ParquetRecordReaderTest extends 
BaseTestQuery{
 
 
   public void testParquetFullEngineLocalPath(String planFileName, String 
filename, int numberOfTimesRead /* specified in json plan */, int 
numberOfRowGroups, int recordsPerRowGroup) throws Exception{
-    
testParquetFullEngineLocalText(Files.toString(FileUtils.getResourceAsFile(planFileName),
 Charsets.UTF_8), filename, numberOfTimesRead, numberOfRowGroups, 
recordsPerRowGroup);
+    
testParquetFullEngineLocalText(Files.toString(FileUtils.getResourceAsFile(planFileName),
 Charsets.UTF_8), filename,
+        numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup, true);
   }
 
   //specific tests should call this method, but it is not marked as a test 
itself intentionally
-  public void testParquetFullEngineLocalText(String planText, String filename, 
int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int 
recordsPerRowGroup) throws Exception{
-    testFull(QueryType.LOGICAL, planText, filename, numberOfTimesRead, 
numberOfRowGroups, recordsPerRowGroup);
+  public void testParquetFullEngineLocalText(String planText, String filename, 
int numberOfTimesRead /* specified in json plan */,
+                                             int numberOfRowGroups, int 
recordsPerRowGroup, boolean testValues) throws Exception{
+    testFull(QueryType.LOGICAL, planText, filename, numberOfTimesRead, 
numberOfRowGroups, recordsPerRowGroup, testValues);
   }
 
-  private void testFull(QueryType type, String planText, String filename, int 
numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int 
recordsPerRowGroup) throws Exception{
+  private void testFull(QueryType type, String planText, String filename, int 
numberOfTimesRead /* specified in json plan */,
+                        int numberOfRowGroups, int recordsPerRowGroup, boolean 
testValues) throws Exception{
 
 //    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
     HashMap<String, FieldInfo> fields = new HashMap<>();
     ParquetTestProperties props = new ParquetTestProperties(numberRowGroups, 
recordsPerRowGroup, DEFAULT_BYTES_PER_PAGE, fields);
     TestFileGenerator.populateFieldInfoMap(props);
-    ParquetResultListener resultListener = new 
ParquetResultListener(getAllocator(), props, numberOfTimesRead, true);
+    ParquetResultListener resultListener = new 
ParquetResultListener(getAllocator(), props, numberOfTimesRead, testValues);
     Stopwatch watch = new Stopwatch().start();
     testWithListener(type, planText, resultListener);
     resultListener.getResults();
@@ -162,7 +175,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   //use this method to submit physical plan
   public void testParquetFullEngineLocalTextDistributed(String planName, 
String filename, int numberOfTimesRead /* specified in json plan */, int 
numberOfRowGroups, int recordsPerRowGroup) throws Exception{
     String planText = Files.toString(FileUtils.getResourceAsFile(planName), 
Charsets.UTF_8);
-    testFull(QueryType.PHYSICAL, planText, filename, numberOfTimesRead, 
numberOfRowGroups, recordsPerRowGroup);
+    testFull(QueryType.PHYSICAL, planText, filename, numberOfTimesRead, 
numberOfRowGroups, recordsPerRowGroup, true);
   }
 
   public String pad(String value, int length) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/10127846/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index a533117..4a0efc9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -173,15 +173,17 @@ public class ParquetResultListener implements 
UserResultsListener {
     int recordsInBatch = -1;
     if(result.getHeader().getIsLastChunk()){
       // ensure the right number of columns was returned, especially important 
to ensure selective column read is working
-      //assert valuesChecked.keySet().size() == props.fields.keySet().size() : 
"Unexpected number of output columns from parquet scan,";
+      if (testValues) {
+        assertEquals( "Unexpected number of output columns from parquet 
scan.", valuesChecked.keySet().size(), props.fields.keySet().size() );
+      }
       for (String s : valuesChecked.keySet()) {
         try {
-           if (recordsInBatch == -1 ){
-             recordsInBatch = valuesChecked.get(s);
-           } else {
-             assertEquals("Mismatched record counts in vectors.", 
recordsInBatch, valuesChecked.get(s).intValue());
-           }
-          //assertEquals("Record count incorrect for column: " + s, 
totalRecords, (long) valuesChecked.get(s));
+          if (recordsInBatch == -1 ){
+            recordsInBatch = valuesChecked.get(s);
+          } else {
+            assertEquals("Mismatched record counts in vectors.", 
recordsInBatch, valuesChecked.get(s).intValue());
+          }
+          assertEquals("Record count incorrect for column: " + s, 
totalRecords, (long) valuesChecked.get(s));
         } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
       }
 

Reply via email to