DRILL-1307: add support for fixed binary columns in parquet reader.

DRILL-1314: Fix issue reading impala produced files

DRILL-1304: Regression selecting a single column from a parquet file.

Fixed issue with var length dictionary reading.

Reduced memory usage by freeing buffers after we finish reading a page (except 
for dictionary pages which need to be kept in memory until the entire row group 
has been read)

Rebased onto merge branch.

Successfully backed out the changes that had changed the structure of the 
nullable column readers. This re-introduced some redundancy but avoided a bug 
that was holding up the release. Ended up falling back on the higher level 
reader API, only in the case where we are reading a dictionary column and then 
the next page is not dictionary encoded. This can be fixed to use the optimized 
read instead, but it doesn't conform to the overall structure of the current 
reader and is a bit difficult to fix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c1fcb652
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c1fcb652
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c1fcb652

Branch: refs/heads/master
Commit: c1fcb6528cf1d037df91d54629f2c71902cb25cc
Parents: 42bbf6f
Author: Jason Altekruse <altekruseja...@gmail.com>
Authored: Mon Aug 25 14:56:27 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Fri Aug 29 18:31:11 2014 -0700

----------------------------------------------------------------------
 .../codegen/templates/NullableValueVectors.java |   6 +-
 .../parquet/columnreaders/ColumnReader.java     |   1 -
 .../columnreaders/ColumnReaderFactory.java      |  19 ++-
 .../columnreaders/FixedByteAlignedReader.java   |  38 +++++-
 .../columnreaders/FixedWidthRepeatedReader.java |   6 +
 .../NullableFixedByteAlignedReaders.java        |  34 ++++-
 .../NullableVarLengthValuesColumn.java          |   2 +
 .../store/parquet/columnreaders/PageReader.java |  71 ++++++----
 .../ParquetFixedWidthDictionaryReader.java      |  53 --------
 .../ParquetFixedWidthDictionaryReaders.java     | 133 +++++++++++++++++++
 .../columnreaders/ParquetRecordReader.java      |  31 +++--
 .../ParquetToDrillTypeConverter.java            |   9 +-
 .../columnreaders/VarLengthColumnReaders.java   |  20 +--
 .../columnreaders/VarLengthValuesColumn.java    |  13 +-
 .../drill/exec/vector/NullableVector.java       |  23 ++++
 .../physical/impl/writer/TestParquetWriter.java |  15 +++
 .../store/parquet/ParquetRecordReaderTest.java  |  58 +++++---
 .../drill/jdbc/test/TestFunctionsQuery.java     |   1 +
 18 files changed, 399 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index d9eae0f..492d495 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -44,7 +44,7 @@ package org.apache.drill.exec.vector;
  * NB: this class is automatically generated from ValueVectorTypes.tdd using 
FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class ${className} extends BaseValueVector implements <#if 
type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector{
+public final class ${className} extends BaseValueVector implements <#if 
type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, 
NullableVector{
 
   private int valueCount;
   final UInt1Vector bits;
@@ -96,6 +96,10 @@ public final class ${className} extends BaseValueVector 
implements <#if type.maj
     return values.getData();
   }
 
+  public ${valuesName} getValuesVector() {
+    return values;
+  }
+
   <#if type.major == "VarLen">
   @Override
   public SerializedField getMetadata() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index b240407..272a5c3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -118,7 +118,6 @@ public abstract class ColumnReader<V extends ValueVector> {
     readField(recordsToRead);
 
     valuesReadInCurrentPass += recordsReadInThisIteration;
-    totalValuesRead += recordsReadInThisIteration;
     pageReader.valuesRead += recordsReadInThisIteration;
     pageReader.readPosInBytes = readStartInBytes + readLength;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 243744e..3d36b64 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -20,8 +20,12 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 
+import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
 import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
 import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
@@ -72,8 +76,19 @@ public class ColumnReaderFactory {
         return new FixedByteAlignedReader.DateReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       } else{
         if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-          return new ParquetFixedWidthDictionaryReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData,
-              fixedLength, v, schemaElement);
+          switch (columnChunkMetaData.getType()) {
+            case INT32:
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, 
schemaElement);
+            case INT64:
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, 
schemaElement);
+            case FLOAT:
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, 
schemaElement);
+            case DOUBLE:
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, 
schemaElement);
+            default:
+              throw new ExecutionSetupException("Unsupported dictionary column 
type " + descriptor.getType().name() );
+          }
+
         } else {
           return new FixedByteAlignedReader(recordReader, allocateSize, 
descriptor, columnChunkMetaData,
               fixedLength, v, schemaElement);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 54771e4..bfbefdb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.vector.DateVector;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
 import org.apache.drill.exec.vector.Decimal38SparseVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 import org.joda.time.DateTimeUtils;
 
 import parquet.column.ColumnDescriptor;
@@ -68,6 +69,29 @@ class FixedByteAlignedReader extends ColumnReader {
         (int) readStartInBytes, (int) readLength);
   }
 
+  public static class FixedBinaryReader extends FixedByteAlignedReader {
+    // TODO - replace this with fixed binary type in drill
+    VariableWidthVector castedVector;
+
+    FixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                    VariableWidthVector v, SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, true, 
v, schemaElement);
+      castedVector = v;
+    }
+
+    protected void readField(long recordsToReadInThisPass) {
+      // we can use the standard read method to transfer the data
+      super.readField(recordsToReadInThisPass);
+      // TODO - replace this with fixed binary type in drill
+      // now we need to write the lengths of each value
+      int byteLength = dataTypeLengthInBits / 8;
+      for (int i = 0; i < recordsToReadInThisPass; i++) {
+        castedVector.getMutator().setValueLengthSafe(i, byteLength);
+      }
+    }
+
+  }
+
   public static abstract class ConvertedReader extends FixedByteAlignedReader {
 
     protected int dataTypeLengthInBytes;
@@ -104,10 +128,20 @@ class FixedByteAlignedReader extends ColumnReader {
 
     @Override
     void addNext(int start, int index) {
-      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
-          
NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, 
start)
+//      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
+//          
NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, 
start)
+      dateVector.getMutator().set(index, 
DateTimeUtils.fromJulianDay(readIntLittleEndian(bytebuf, start)
               - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
     }
+
+    // copied out of parquet library, didn't want to deal with the uneeded 
throws statement they had declared
+    public static int readIntLittleEndian(ByteBuf in, int offset) {
+      int ch4 = in.getByte(offset) & 0xff;
+      int ch3 = in.getByte(offset + 1) & 0xff;
+      int ch2 = in.getByte(offset + 2) & 0xff;
+      int ch1 = in.getByte(offset + 3) & 0xff;
+      return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+    }
   }
 
   public static class Decimal28Reader extends ConvertedReader {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index bbff574..18618e6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -51,6 +51,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn 
{
     castedRepeatedVector = (RepeatedFixedWidthVector) valueVector;
     this.dataTypeLengthInBytes = dataTypeLengthInBytes;
     this.dataReader = dataReader;
+    this.dataReader.pageReader.clear();
     this.dataReader.pageReader = this.pageReader;
     // this is not in the reset method because it needs to be initialized only 
for the very first page read
     // in all other cases if a read ends at a page boundary we will need to 
keep track of this flag and not
@@ -209,5 +210,10 @@ public class FixedWidthRepeatedReader extends 
VarLengthColumn {
   public int capacity() {
     return 
castedRepeatedVector.getMutator().getDataVector().getData().capacity();
   }
+
+  public void clear() {
+    super.clear();
+    dataReader.clear();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index f88d56a..557bd9f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -82,6 +82,10 @@ public class NullableFixedByteAlignedReaders {
         for (int i = 0; i < recordsToReadInThisPass; i++){
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readInteger());
         }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.valueReader.readInteger());
+        }
       }
     }
   }
@@ -97,8 +101,14 @@ public class NullableFixedByteAlignedReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-      for (int i = 0; i < recordsToReadInThisPass; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readLong());
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readLong());
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.valueReader.readLong());
+        }
       }
     }
   }
@@ -114,8 +124,14 @@ public class NullableFixedByteAlignedReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-      for (int i = 0; i < recordsToReadInThisPass; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readFloat());
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readFloat());
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.valueReader.readFloat());
+        }
       }
     }
   }
@@ -131,8 +147,14 @@ public class NullableFixedByteAlignedReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
-      for (int i = 0; i < recordsToReadInThisPass; i++){
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readDouble());
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readDouble());
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.valueReader.readDouble());
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index ba9ff80..dc29fbd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -116,6 +116,8 @@ public abstract class NullableVarLengthValuesColumn<V 
extends ValueVector> exten
 
   @Override
   protected void readField(long recordsToRead) {
+    // TODO - unlike most implementations of this method, the 
recordsReadInThisIteration field is not set here
+    // should verify that this is not breaking anything
     if (usingDictionary) {
       currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
       // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 4165cbd..639577d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -91,10 +91,14 @@ final class PageReader {
 
   List<ByteBuf> allocatedBuffers;
 
+  // These need to be held throughout reading of the entire column chunk
+  List<ByteBuf> allocatedDictionaryBuffers;
+
   PageReader(ColumnReader parentStatus, FileSystem fs, Path path, 
ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
     allocatedBuffers = new ArrayList<ByteBuf>();
+    allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
 
     long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
     long start = columnChunkMetaData.getFirstDataPageOffset();
@@ -108,7 +112,7 @@ final class PageReader {
 
         BytesInput bytesIn;
         ByteBuf 
uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
-        allocatedBuffers.add(uncompressedData);
+        allocatedDictionaryBuffers.add(uncompressedData);
         
if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED)
 {
           dataReader.getPageAsBytesBuf(uncompressedData, 
pageHeader.compressed_page_size);
           
bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
@@ -157,6 +161,7 @@ final class PageReader {
     if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == 
parentColumnReader.columnChunkMetaData.getValueCount()) {
       return false;
     }
+    clearBuffers();
 
     // next, we need to decompress the bytes
     // TODO - figure out if we need multiple dictionary pages, I believe it 
may be limited to one
@@ -168,7 +173,7 @@ final class PageReader {
         //TODO: Handle buffer allocation exception
         BytesInput bytesIn;
         ByteBuf 
uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
-        allocatedBuffers.add(uncompressedData);
+        allocatedDictionaryBuffers.add(uncompressedData);
         if( parentColumnReader.columnChunkMetaData.getCodec()== 
CompressionCodecName.UNCOMPRESSED) {
           dataReader.getPageAsBytesBuf(uncompressedData, 
pageHeader.compressed_page_size);
           
bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
@@ -229,6 +234,7 @@ final class PageReader {
     }
 
     pageDataByteArray = 
DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer());
+    allocatedBuffers.add(pageDataByteArray);
 
     readPosInBytes = 0;
     if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
@@ -244,29 +250,29 @@ final class PageReader {
     }
     if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
       parentColumnReader.currDefLevel = -1;
-      if (!currentPage.getValueEncoding().usesDictionary()) {
-        parentColumnReader.usingDictionary = false;
-        definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
-        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
-        readPosInBytes = definitionLevels.getNextOffset();
-        if (parentColumnReader.columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
-          valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
-          valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
-        }
-      } else {
-        parentColumnReader.usingDictionary = true;
-        definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
-        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
-        readPosInBytes = definitionLevels.getNextOffset();
-        // initialize two of the dictionary readers, one is for determining 
the lengths of each value, the second is for
-        // actually copying the values out into the vectors
-        dictionaryLengthDeterminingReader = new 
DictionaryValuesReader(dictionary);
-        
dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
-        dictionaryValueReader = new DictionaryValuesReader(dictionary);
-        dictionaryValueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
-        this.parentColumnReader.usingDictionary = true;
+      definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
+      definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      readPosInBytes = definitionLevels.getNextOffset();
+      if ( ! currentPage.getValueEncoding().usesDictionary()) {
+        valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
+        valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
       }
     }
+    if (parentColumnReader.columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+      valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
+      valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+    }
+    if (currentPage.getValueEncoding().usesDictionary()) {
+      // initialize two of the dictionary readers, one is for determining the 
lengths of each value, the second is for
+      // actually copying the values out into the vectors
+      dictionaryLengthDeterminingReader = new 
DictionaryValuesReader(dictionary);
+      
dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      dictionaryValueReader = new DictionaryValuesReader(dictionary);
+      dictionaryValueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+      parentColumnReader.usingDictionary = true;
+    } else {
+      parentColumnReader.usingDictionary = false;
+    }
     // readPosInBytes is used for actually reading the values after we 
determine how many will fit in the vector
     // readyToReadPosInBytes serves a similar purpose for the vector types 
where we must count up the values that will
     // fit one record at a time, such as for variable length data. Both 
operations must start in the same location after the
@@ -275,13 +281,26 @@ final class PageReader {
     return true;
   }
 
+  public void clearBuffers() {
+    for (ByteBuf b : allocatedBuffers) {
+      b.release();
+    }
+    allocatedBuffers.clear();
+  }
+
+  public void clearDictionaryBuffers() {
+    for (ByteBuf b : allocatedDictionaryBuffers) {
+      b.release();
+    }
+    allocatedDictionaryBuffers.clear();
+  }
+
   public void clear(){
     this.dataReader.clear();
     // Free all memory, including fixed length types. (Data is being copied 
for all types not just var length types)
     //if(!this.parentColumnReader.isFixedLength) {
-      for (ByteBuf b : allocatedBuffers) {
-        b.release();
-      }
+    clearBuffers();
+    clearDictionaryBuffers();
     //}
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
deleted file mode 100644
index ad849b4..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- 
******************************************************************************/
-package org.apache.drill.exec.store.parquet.columnreaders;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.ValueVector;
-import parquet.column.ColumnDescriptor;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.PrimitiveType;
-
-public class ParquetFixedWidthDictionaryReader extends ColumnReader{
-
-  ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
-                                    ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, ValueVector v,
-                                    SchemaElement schemaElement) throws 
ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
-  }
-
-  @Override
-  public void readField(long recordsToReadInThisPass) {
-
-    recordsReadInThisIteration = 
Math.min(pageReader.currentPage.getValueCount()
-        - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
-    int defLevel;
-    for (int i = 0; i < recordsReadInThisIteration; i++){
-      defLevel = pageReader.definitionLevels.readInteger();
-      // if the value is defined
-      if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
-        if (columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.INT64)
-          ((BigIntVector)valueVec).getMutator().set(i + 
valuesReadInCurrentPass,
-              pageReader.valueReader.readLong() );
-      }
-      // otherwise the value is skipped, because the bit vector indicating 
nullability is zero filled
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
new file mode 100644
index 0000000..bad6e6a
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -0,0 +1,133 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ParquetFixedWidthDictionaryReaders {
+
+  static class DictionaryIntReader extends FixedByteAlignedReader {
+
+    IntVector castedVector;
+
+    DictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                                ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, IntVector v,
+                                SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = 
Math.min(pageReader.currentPage.getValueCount()
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+
+      if (usingDictionary) {
+        for (int i = 0; i < recordsReadInThisIteration; i++){
+          castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readInteger());
+        }
+      }
+    }
+  }
+
+  static class DictionaryBigIntReader extends FixedByteAlignedReader {
+
+    BigIntVector castedVector;
+
+    DictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, BigIntVector v,
+                                   SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = 
Math.min(pageReader.currentPage.getValueCount()
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+
+      for (int i = 0; i < recordsReadInThisIteration; i++){
+        try {
+        castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readLong());
+        } catch ( Exception ex) {
+          throw ex;
+        }
+      }
+    }
+  }
+
+  static class DictionaryFloat4Reader extends FixedByteAlignedReader {
+
+    Float4Vector castedVector;
+
+    DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, Float4Vector v,
+                                   SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      recordsReadInThisIteration = 
Math.min(pageReader.currentPage.getValueCount()
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+
+      for (int i = 0; i < recordsReadInThisIteration; i++){
+        castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readFloat());
+      }
+    }
+  }
+
+  static class DictionaryFloat8Reader extends FixedByteAlignedReader {
+
+    Float8Vector castedVector;
+
+    DictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, Float8Vector v,
+                                   SchemaElement schemaElement) throws 
ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      recordsReadInThisIteration = 
Math.min(pageReader.currentPage.getValueCount()
+          - pageReader.valuesRead, recordsToReadInThisPass - 
valuesReadInCurrentPass);
+
+      for (int i = 0; i < recordsReadInThisIteration; i++){
+        castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, 
pageReader.dictionaryValueReader.readDouble());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index c72e750..f3d9e2c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -60,6 +60,7 @@ import parquet.format.SchemaElement;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.PrimitiveType;
@@ -110,6 +111,7 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
 
   private final CodecFactoryExposer codecFactoryExposer;
   int rowGroupIndex;
+  long totalRecordsRead;
 
   public ParquetRecordReader(FragmentContext fragmentContext, //
                              String path, //
@@ -223,6 +225,10 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
 //    ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
     FileMetaData fileMetaData;
 
+    logger.debug("Reading row group({}) with {} records in file {}.", 
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
+        hadoopPath.toUri().getPath());
+    totalRecordsRead = 0;
+
     // TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
     // store a map from column name to converted types if they are non-null
     HashMap<String, SchemaElement> schemaElements = new HashMap<>();
@@ -247,13 +253,11 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
         if (column.getMaxRepetitionLevel() > 0) {
           allFieldsFixedLength = false;
         }
-        // There is not support for the fixed binary type yet in parquet, 
leaving a task here as a reminder
-        // TODO - implement this when the feature is added upstream
-          if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
-              bitWidthAllFixedFields += se.getType_length() * 8;
-          } else {
-            bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
-          }
+        if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
+            bitWidthAllFixedFields += se.getType_length() * 8;
+        } else {
+          bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
+        }
       } else {
         allFieldsFixedLength = false;
       }
@@ -287,13 +291,15 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
         v = output.addField(field, (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
         if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
           if (column.getMaxRepetitionLevel() > 0) {
-            ColumnReader dataReader = 
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, 
columnChunkMetaData, recordsPerBatch,
+            ColumnReader dataReader = 
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
+                column, columnChunkMetaData, recordsPerBatch,
                 ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), 
schemaElement);
             varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
                 getTypeLengthInBits(column.getType()), -1, column, 
columnChunkMetaData, false, v, schemaElement));
           }
           else {
-            
columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, 
fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
+            
columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, 
fieldFixedLength,
+                column, columnChunkMetaData, recordsPerBatch, v,
                 schemaElement));
           }
         } else {
@@ -393,6 +399,7 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
           vv.getMutator().setValueCount( (int) recordsToRead);
         }
         mockRecordsRead += recordsToRead;
+        totalRecordsRead += recordsToRead;
         return (int) recordsToRead;
       }
 
@@ -418,6 +425,8 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
         }
       }
 
+
+      totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
       return firstColumnStatus.getRecordsReadInCurrentPass();
     } catch (IOException e) {
       throw new DrillRuntimeException(e);
@@ -426,6 +435,10 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
 
   @Override
   public void cleanup() {
+    logger.debug("Read {} records out of row group({}) in file '{}'", 
totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
+    // enable this for debugging when it is know that a whole file will be read
+    // limit kills upstream operators once it has enough records, so this 
assert will fail
+//    assert totalRecordsRead == 
footer.getBlocks().get(rowGroupIndex).getRowCount();
     for (ColumnReader column : columnStatuses) {
       column.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index 5bba6be..481b289 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -95,8 +95,7 @@ public class ParquetToDrillTypeConverter {
           case FIXED_LEN_BYTE_ARRAY:
             if (convertedType == null) {
               checkArgument(length > 0, "A length greater than zero must be 
provided for a FixedBinary type.");
-              return 
TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
-                  .setWidth(length).setMode(mode).build();
+              return 
TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build();
             } else if (convertedType == ConvertedType.DECIMAL) {
               return 
Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, 
schemaElement.getScale(), schemaElement.getPrecision());
             }
@@ -159,8 +158,7 @@ public class ParquetToDrillTypeConverter {
           case FIXED_LEN_BYTE_ARRAY:
             if (convertedType == null) {
               checkArgument(length > 0, "A length greater than zero must be 
provided for a FixedBinary type.");
-              return 
TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
-                  .setWidth(length).setMode(mode).build();
+              return 
TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build();
             } else if (convertedType == ConvertedType.DECIMAL) {
               return 
Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, 
schemaElement.getScale(), schemaElement.getPrecision());
             }
@@ -223,8 +221,7 @@ public class ParquetToDrillTypeConverter {
           case FIXED_LEN_BYTE_ARRAY:
             if (convertedType == null) {
               checkArgument(length > 0, "A length greater than zero must be 
provided for a FixedBinary type.");
-              return 
TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
-                  .setWidth(length).setMode(mode).build();
+              return 
TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build();
             } else if (convertedType == ConvertedType.DECIMAL) {
               return 
Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, 
schemaElement.getScale(), schemaElement.getPrecision());
             }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index ecfa110..2f3711d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -184,13 +184,13 @@ public class VarLengthColumnReaders {
       if(index >= varCharVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
+        DrillBuf b = 
DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
         int st=0;
-        int len=currDictVal.length();
+        int len=currDictValToWrite.length();
         VarCharHolder holder = new VarCharHolder();
         holder.buffer=b;
         holder.start=0;
-        holder.end=currDictVal.length();
+        holder.end=currDictValToWrite.length();
         success = varCharVector.getMutator().setSafe(index, holder);
       }
       else {
@@ -230,8 +230,8 @@ public class VarLengthColumnReaders {
       if(index >= vector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
-        success = mutator.setSafe(index, 1, 0, currDictVal.length(), b);
+        DrillBuf b = 
DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
+        success = mutator.setSafe(index, 1, 0, currDictValToWrite.length(), b);
       }
       else {
         success = mutator.setSafe(index, 1, start, start+length, value);
@@ -263,13 +263,13 @@ public class VarLengthColumnReaders {
       if(index >= varBinaryVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
+        DrillBuf b = 
DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
         int st=0;
-        int len=currDictVal.length();
+        int len=currDictValToWrite.length();
         VarBinaryHolder holder = new VarBinaryHolder();
         holder.buffer=b;
         holder.start=0;
-        holder.end=currDictVal.length();
+        holder.end=currDictValToWrite.length();
         success = varBinaryVector.getMutator().setSafe(index, holder);
       }
       else {
@@ -307,11 +307,11 @@ public class VarLengthColumnReaders {
       if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
+        DrillBuf b = 
DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
         NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
         holder.buffer=b;
         holder.start=0;
-        holder.end=currDictVal.length();
+        holder.end=currDictValToWrite.length();
         holder.isSet=1;
         success = nullableVarBinaryVector.getMutator().setSafe(index, holder);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 829b44a..4f02c70 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -82,10 +82,17 @@ public abstract class VarLengthValuesColumn<V extends 
ValueVector> extends VarLe
 
   protected boolean readAndStoreValueSizeInformation() throws IOException {
     // re-purposing this field here for length in BYTES to prevent repetitive 
multiplication/division
-    try {
+    if (usingDictionary) {
+      if (currLengthDeterminingDictVal == null) {
+        currLengthDeterminingDictVal = 
pageReader.dictionaryLengthDeterminingReader.readBytes();
+      }
+      currDictValToWrite = currLengthDeterminingDictVal;
+      // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
+      dataTypeLengthInBits = currLengthDeterminingDictVal.length();
+    }
+    else {
+      // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
       dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) 
pageReader.readyToReadPosInBytes);
-    } catch (Throwable t) {
-      throw t;
     }
 
     // this should not fail

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java
new file mode 100644
index 0000000..92f60d6
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+public interface NullableVector extends ValueVector{
+
+  public ValueVector getValuesVector();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index fb0d1df..cdae265 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -224,6 +224,21 @@ public class TestParquetWriter extends BaseTestQuery {
     runTestAndValidate(selection, validateSelection, inputTable, 
"foodmart_employee_parquet");
   }
 
+  @Test
+  public void testParquetRead() throws Exception {
+    test("alter system set `store.parquet.use_new_reader` = true");
+    List<QueryResultBatch> expected = testSqlWithResults("select * from 
dfs.`/tmp/voter`");
+    test("alter system set `store.parquet.use_new_reader` = false");
+    List<QueryResultBatch> results = testSqlWithResults("select * from 
dfs.`/tmp/voter`");
+    compareResults(expected, results);
+    for (QueryResultBatch result : results) {
+      result.release();
+    }
+    for (QueryResultBatch result : expected) {
+      result.release();
+    }
+  }
+
   public void runTestAndValidate(String selection, String validationSelection, 
String inputTable, String outputFile) throws Exception {
 
     Path path = new Path("/tmp/" + outputFile);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index e584210..def6dac 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -79,6 +79,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
+@Ignore
 public class ParquetRecordReaderTest extends BaseTestQuery{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
 
@@ -125,7 +126,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   }
 
   @Test
-  @Ignore
+
   public void testDictionaryError() throws Exception {
     String readEntries;
     readEntries = "\"/tmp/lineitem_null_dict.parquet\"";
@@ -136,7 +137,31 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     testFull(QueryType.SQL, "select L_RECEIPTDATE from 
dfs.`/tmp/lineitem_null_dict.parquet`", "", 1, 1, 100000, false);
   }
 
-  @Ignore
+
+  @Test
+  public void testFixedBinary() throws Exception {
+    String readEntries = "\"/tmp/drilltest/fixed_binary.parquet\"";
+
+    String planText = 
Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"),
 Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
+    testParquetFullEngineLocalText(planText, fileName, 1, 1, 1000000, false);
+  }
+
+  @Test
+  public void testNonNullableDictionaries() throws Exception {
+    testFull(QueryType.SQL, "select * from 
dfs.`/tmp/drilltest/non_nullable_dictionary.parquet`", "", 1, 1, 30000000, 
false);
+  }
+
+  @Test
+  public void testReadVoter() throws Exception {
+    testFull(QueryType.SQL, "select * from dfs.`/tmp/voter.parquet`", "", 1, 
1, 1000, false);
+  }
+
+  @Test
+  public void testDrill_1314() throws Exception {
+    testFull(QueryType.SQL, "select l_partkey " +
+        "from dfs.`/tmp/drill_1314.parquet`", "", 1,1, 10000, false);
+  }
+
   @Test
   public void testDictionaryError_419() throws Exception {
     testFull(QueryType.SQL, "select c_address from 
dfs.`/tmp/customer_snappyimpala_drill_419.parquet`", "", 1, 1, 150000, false);
@@ -149,19 +174,18 @@ public class ParquetRecordReaderTest extends 
BaseTestQuery{
 
 
   @Test
-  @Ignore
+
   public void testNonExistentColumnLargeFile() throws Exception {
     testFull(QueryType.SQL, "select non_existent_column, non_existent_col_2 
from dfs.`/tmp/customer.dict.parquet`", "", 1, 1, 150000, false);
   }
 
   @Test
-  @Ignore
+
   public void testNonExistentColumnsSomePresentColumnsLargeFile() throws 
Exception {
     testFull(QueryType.SQL, "select cust_key, address,  non_existent_column, 
non_existent_col_2 from dfs.`/tmp/customer.dict.parquet`", "", 1, 1, 150000, 
false);
   }
 
   @Test
-  @Ignore
   public void testTPCHPerformace_SF1() throws Exception {
     testFull(QueryType.SQL, "select * from 
dfs.`/tmp/orders_part-m-00001.parquet`", "", 1, 1, 150000, false);
   }
@@ -314,7 +338,6 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   // TODO - Test currently marked ignore to prevent breaking of the build 
process, requires a binary file that was
   // generated using pig. Will need to find a good place to keep files like 
this.
   // For now I will upload it to the JIRA as an attachment.
-  @Ignore
   @Test
   public void testNullableColumns() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
@@ -324,13 +347,13 @@ public class ParquetRecordReaderTest extends 
BaseTestQuery{
     testParquetFullEngineEventBased(false, "/parquet/parquet_nullable.json", 
"/tmp/nullable_test.parquet", 1, props);
   }
 
-  @Ignore
+
   @Test
   /**
    * Tests the reading of nullable var length columns, runs the tests twice, 
once on a file that has
    * a converted type of UTF-8 to make sure it can be read
    */
-public void testNullableColumnsVarLen() throws Exception {
+  public void testNullableColumnsVarLen() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
     ParquetTestProperties props = new ParquetTestProperties(1, 300000, 
DEFAULT_BYTES_PER_PAGE, fields);
     byte[] val = {'b'};
@@ -351,7 +374,7 @@ public void testNullableColumnsVarLen() throws Exception {
 
   }
 
-  @Ignore
+
   @Test
   public void testFileWithNulls() throws Exception {
     HashMap<String, FieldInfo> fields3 = new HashMap<>();
@@ -364,7 +387,7 @@ public void testNullableColumnsVarLen() throws Exception {
 
   }
 
-  @Ignore
+
   @Test
   public void testDictionaryEncoding() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
@@ -408,7 +431,7 @@ public void testNullableColumnsVarLen() throws Exception {
         "/tmp/test.parquet", i, props);
   }
 
-  @Ignore
+
   @Test
   public void testReadError_Drill_901() throws Exception {
     // select cast( L_COMMENT as varchar) from  
dfs_test.`/tmp/drilltest/employee_parquet`
@@ -418,7 +441,7 @@ public void testNullableColumnsVarLen() throws Exception {
         "unused, no file is generated", 1, props, QueryType.PHYSICAL);
   }
 
-  @Ignore
+
   @Test
   public void testReadError_Drill_839() throws Exception {
     // select cast( L_COMMENT as varchar) from  
dfs.`/tmp/drilltest/employee_parquet`
@@ -429,7 +452,7 @@ public void testNullableColumnsVarLen() throws Exception {
         "unused, no file is generated", 1, props, QueryType.LOGICAL);
   }
 
-  @Ignore
+
   @Test
   public void testReadBug_Drill_418() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
@@ -441,7 +464,7 @@ public void testNullableColumnsVarLen() throws Exception {
   }
 
   // requires binary file generated by pig from TPCH data, also have to 
disable assert where data is coming in
-  @Ignore
+
   @Test
   public void testMultipleRowGroupsAndReadsPigError() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
@@ -459,7 +482,12 @@ public void testNullableColumnsVarLen() throws Exception {
         "unused, no file is generated", 1, props, QueryType.LOGICAL);
   }
 
-  @Ignore
+  @Test
+  public void test958_sql() throws Exception {
+    testFull(QueryType.SQL, "select ss_ext_sales_price from 
dfs.`/tmp/store_sales`", "", 1, 1, 30000000, false);
+  }
+
+
   @Test
   public void drill_958bugTest() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c1fcb652/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java 
b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
index 5aee392..cad5b4d 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestFunctionsQuery.java
@@ -538,6 +538,7 @@ public class TestFunctionsQuery {
             "SIGN_INT=1\n");
   }
 
+  @Ignore
   @Test
   public void testDateTrunc() throws Exception {
     String query = "select "

Reply via email to