[ 
https://issues.apache.org/jira/browse/DRILL-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511965#comment-16511965
 ] 

ASF GitHub Bot commented on DRILL-6353:
---------------------------------------

ilooner closed pull request #1259: DRILL-6353: Upgrade Parquet MR dependencies
URL: https://github.com/apache/drill/pull/1259
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/contrib/storage-hive/hive-exec-shade/pom.xml 
b/contrib/storage-hive/hive-exec-shade/pom.xml
index 6f511adf71..98fd4b8150 100644
--- a/contrib/storage-hive/hive-exec-shade/pom.xml
+++ b/contrib/storage-hive/hive-exec-shade/pom.xml
@@ -31,6 +31,20 @@
   <packaging>jar</packaging>
   <name>contrib/hive-storage-plugin/hive-exec-shaded</name>
 
+  <properties>
+    <hive.parquet.version>1.8.3</hive.parquet.version>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-hadoop-bundle</artifactId>
+        <version>${hive.parquet.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -68,11 +82,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <!--Once newer hive-exec version leverages parquet-column 1.9.0, this 
dependency can be deleted -->
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-column</artifactId>
-    </dependency>
   </dependencies>
 
   <build>
@@ -83,7 +92,7 @@
           <artifactSet>
             <includes>
               <include>org.apache.hive:hive-exec</include>
-              <include>org.apache.parquet:parquet-column</include>
+              <include>org.apache.parquet:parquet-hadoop-bundle</include>
               <include>commons-codec:commons-codec</include>
               <include>com.fasterxml.jackson.core:jackson-databind</include>
               <include>com.fasterxml.jackson.core:jackson-annotations</include>
@@ -117,6 +126,10 @@
               <pattern>org.apache.parquet.</pattern>
               <shadedPattern>hive.org.apache.parquet.</shadedPattern>
             </relocation>
+            <relocation>
+              <pattern>shaded.parquet.</pattern>
+              <shadedPattern>hive.shaded.parquet.</shadedPattern>
+            </relocation>
             <relocation>
               <pattern>org.apache.avro.</pattern>
               <shadedPattern>hive.org.apache.avro.</shadedPattern>
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2205c2f4cd..7701e76165 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -249,92 +249,17 @@
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>${parquet.version}</version>
       <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-format</artifactId>
-      <version>2.3.0-incubating</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-common</artifactId>
       <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-jackson</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-encoding</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-generator</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>javax.inject</groupId>
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
index 9e561ad364..ebceefb435 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
@@ -113,7 +113,7 @@ public boolean canDrop(RangeExprEvaluator<C> evaluator) {
       // can drop when left's max < right's min, or right's max < left's min
       final C leftMin = leftStat.genericGetMin();
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) < 0 || 
rightStat.genericGetMax().compareTo(leftMin) < 0;
+      return (leftStat.compareMaxToValue(rightMin) < 0) || 
(rightStat.compareMaxToValue(leftMin) < 0);
     }) {
       @Override
       public String toString() {
@@ -132,7 +132,7 @@ public String toString() {
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, 
rightStat) -> {
       // can drop when left's max <= right's min.
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) <= 0;
+      return leftStat.compareMaxToValue(rightMin) <= 0;
     });
   }
 
@@ -146,7 +146,7 @@ public String toString() {
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, 
rightStat) -> {
       // can drop when left's max < right's min.
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) < 0;
+      return leftStat.compareMaxToValue(rightMin) < 0;
     });
   }
 
@@ -160,7 +160,7 @@ public String toString() {
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, 
rightStat) -> {
       // can drop when right's max <= left's min.
       final C leftMin = leftStat.genericGetMin();
-      return rightStat.genericGetMax().compareTo(leftMin) <= 0;
+      return rightStat.compareMaxToValue(leftMin) <= 0;
     });
   }
 
@@ -173,7 +173,7 @@ public String toString() {
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, 
rightStat) -> {
       // can drop when right's max < left's min.
       final C leftMin = leftStat.genericGetMin();
-      return rightStat.genericGetMax().compareTo(leftMin) < 0;
+      return rightStat.compareMaxToValue(leftMin) < 0;
     });
   }
 
@@ -188,8 +188,8 @@ public String toString() {
       // can drop when there is only one unique value.
       final C leftMax = leftStat.genericGetMax();
       final C rightMax = rightStat.genericGetMax();
-      return leftStat.genericGetMin().compareTo(leftMax) == 0 && 
rightStat.genericGetMin().compareTo(rightMax) == 0 &&
-          leftStat.genericGetMax().compareTo(rightMax) == 0;
+      return leftStat.compareMinToValue(leftMax) == 0 && 
rightStat.compareMinToValue(rightMax) == 0 &&
+          leftStat.compareMaxToValue(rightMax) == 0;
     });
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
index 9b041020f6..547dc06704 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
@@ -23,6 +23,7 @@
 import org.apache.drill.common.expression.TypedFieldExpr;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.Statistics;
 
 import java.util.ArrayList;
@@ -114,7 +115,7 @@ public boolean canDrop(RangeExprEvaluator<C> evaluator) {
   private static LogicalExpression createIsTruePredicate(LogicalExpression 
expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if max value is not true or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> 
!exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, 
evaluator.getRowCount())
+        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || 
isAllNulls(exprStat, evaluator.getRowCount())
     );
   }
 
@@ -124,7 +125,7 @@ private static LogicalExpression 
createIsTruePredicate(LogicalExpression expr) {
   private static LogicalExpression createIsFalsePredicate(LogicalExpression 
expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if min value is not false or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> 
!exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, 
evaluator.getRowCount())
+        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || 
isAllNulls(exprStat, evaluator.getRowCount())
     );
   }
 
@@ -134,7 +135,7 @@ private static LogicalExpression 
createIsFalsePredicate(LogicalExpression expr)
   private static LogicalExpression createIsNotTruePredicate(LogicalExpression 
expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if min value is not false or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> 
!exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat)
+        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && 
hasNoNulls(exprStat)
     );
   }
 
@@ -144,7 +145,7 @@ private static LogicalExpression 
createIsNotTruePredicate(LogicalExpression expr
   private static LogicalExpression createIsNotFalsePredicate(LogicalExpression 
expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if max value is not true or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> 
!exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat)
+        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && 
hasNoNulls(exprStat)
     );
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
index 7ff1036443..f804a7b06f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
@@ -43,7 +43,7 @@ static boolean isNullOrEmpty(Statistics stat) {
    *          False if at least one row is not null.
    */
   static boolean isAllNulls(Statistics stat, long rowCount) {
-    return stat.getNumNulls() == rowCount;
+    return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
   }
 
   /**
@@ -54,7 +54,7 @@ static boolean isAllNulls(Statistics stat, long rowCount) {
    *          False if the parquet file hasn't nulls.
    */
   static boolean hasNoNulls(Statistics stat) {
-    return stat.getNumNulls() == 0;
+    return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
   }
 
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 6a320b85b3..dc09ce1b69 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -33,11 +33,12 @@
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
@@ -50,6 +51,9 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
 public abstract class AbstractParquetScanBatchCreator {
 
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
@@ -146,11 +150,15 @@ protected ScanBatch getBatch(ExecutorFragmentContext 
context, AbstractParquetRow
   protected abstract AbstractDrillFileSystemManager 
getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager 
optionManager);
 
   private ParquetMetadata readFooter(Configuration conf, String path) throws 
IOException {
-    Configuration newConf = new Configuration(conf);
+    conf = new Configuration(conf);
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
     conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
-    return ParquetFileReader.readFooter(newConf, new Path(path), 
ParquetMetadataConverter.NO_FILTER);
+    conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
+    ParquetReadOptions options = 
ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
+    try (ParquetFileReader reader = 
ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), 
options)) {
+      return reader.getFooter();
+    }
   }
 
   private boolean isComplex(ParquetMetadata footer) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index dcd40cf910..79294daea7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -21,14 +21,13 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 public class ColumnDataReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
@@ -58,11 +57,7 @@ public BytesInput getPageAsBytesInput(int pageLength) throws 
IOException{
 
   public void loadPage(DrillBuf target, int pageLength) throws IOException {
     target.clear();
-    ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
-    int lengthLeftToRead = pageLength;
-    while (lengthLeftToRead > 0) {
-      lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, 
lengthLeftToRead);
-    }
+    HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength));
     target.writerIndex(pageLength);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index ea34c7d8b8..d1562c48c0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -42,6 +42,7 @@
 import com.google.common.base.Preconditions;
 
 import static 
org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 
 public class FooterGatherer {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -160,7 +161,8 @@ public static Footer readFooter(final Configuration config, 
final FileStatus sta
         footerBytes = ArrayUtils.subarray(footerBytes, start, start + size);
       }
 
-      ParquetMetadata metadata = 
ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new 
ByteArrayInputStream(footerBytes));
+      final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes);
+      ParquetMetadata metadata = 
ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, 
NO_FILTER);
       Footer footer = new Footer(status.getPath(), metadata);
       return footer;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 09f1b26e24..ba6aac9431 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -17,10 +17,11 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -30,44 +31,42 @@
 /**
  * {@link ByteBufferAllocator} implementation that uses Drill's {@link 
BufferAllocator} to allocate and release
  * {@link ByteBuffer} objects.<br>
- * To properly release an allocated {@link ByteBuf}, this class keeps track of 
it's corresponding {@link ByteBuffer}
+ * To properly release an allocated {@link DrillBuf}, this class keeps track 
of it's corresponding {@link ByteBuffer}
  * that was passed to the Parquet library.
  */
 public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
 
   private final BufferAllocator allocator;
-  private final HashMap<Key, ByteBuf> allocatedBuffers = new HashMap<>();
+  private final Map<ByteBuffer, DrillBuf> allocatedBuffers = new 
IdentityHashMap<>();
 
-  public ParquetDirectByteBufferAllocator(OperatorContext o){
-    allocator = o.getAllocator();
+  public ParquetDirectByteBufferAllocator(OperatorContext o) {
+    this(o.getAllocator());
   }
 
   public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
     this.allocator = allocator;
   }
 
-
   @Override
   public ByteBuffer allocate(int sz) {
-    ByteBuf bb = allocator.buffer(sz);
-    ByteBuffer b = bb.nioBuffer(0, sz);
-    final Key key = new Key(b);
-    allocatedBuffers.put(key, bb);
-    logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. 
Allocated ByteBuffer id: {}", sz, key.hash);
-    return b;
+    DrillBuf drillBuf = allocator.buffer(sz);
+    ByteBuffer byteBuffer = drillBuf.nioBuffer(0, sz);
+    allocatedBuffers.put(byteBuffer, drillBuf);
+    logger.debug("{}: Allocated {} bytes. Allocated DrillBuf with id {} and 
ByteBuffer {}", this, sz, drillBuf.getId(), 
System.identityHashCode(byteBuffer));
+    return byteBuffer;
   }
 
   @Override
-  public void release(ByteBuffer b) {
-    final Key key = new Key(b);
-    final ByteBuf bb = allocatedBuffers.get(key);
+  public void release(ByteBuffer byteBuffer) {
+    final DrillBuf drillBuf = allocatedBuffers.remove(byteBuffer);
     // The ByteBuffer passed in may already have been freed or not allocated 
by this allocator.
     // If it is not found in the allocated buffers, do nothing
-    if(bb != null) {
-      logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. 
Allocated ByteBuffer id: {}", key.hash);
-      bb.release();
-      allocatedBuffers.remove(key);
+    if (drillBuf != null) {
+      logger.debug("{}: Freed DrillBuf with id {} and ByteBuffer {}", this, 
drillBuf.getId(), System.identityHashCode(byteBuffer));
+      drillBuf.release();
+    } else {
+      logger.warn("{}: ByteBuffer {} is not present", this, 
System.identityHashCode(byteBuffer));
     }
   }
 
@@ -75,41 +74,4 @@ public void release(ByteBuffer b) {
   public boolean isDirect() {
     return true;
   }
-
-  /**
-   * ByteBuffer wrapper that computes a fixed hashcode.
-   * <br><br>
-   * Parquet only handles {@link ByteBuffer} objects, so we need to use them 
as keys to keep track of their corresponding
-   * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used 
as a {@link HashMap} key as it is.<br>
-   * This class solves this by providing a fixed hashcode for {@link 
ByteBuffer} and uses reference equality in case
-   * of collisions (we don't need to compare the content of {@link ByteBuffer} 
because the object passed to
-   * {@link #release(ByteBuffer)} will be the same object returned from a 
previous {@link #allocate(int)}.
-   */
-  private class Key {
-    final int hash;
-    final ByteBuffer buffer;
-
-    Key(final ByteBuffer buffer) {
-      this.buffer = buffer;
-      // remember, we can't use buffer.hashCode()
-      this.hash = System.identityHashCode(buffer);
-    }
-
-    @Override
-    public int hashCode() {
-      return hash;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (!(obj instanceof Key)) {
-        return false;
-      }
-      final Key key = (Key) obj;
-      return hash == key.hash && buffer == key.buffer;
-    }
-  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 0e40c9e360..0917926608 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -54,8 +54,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -241,8 +243,15 @@ private void newSchema() throws IOException {
     // once PARQUET-1006 will be resolved
     pageStore = new 
ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, 
initialSlabSize,
         pageSize, new ParquetDirectByteBufferAllocator(oContext));
-    store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, 
enableDictionary,
-        writerVersion, new ParquetDirectByteBufferAllocator(oContext));
+    ParquetProperties parquetProperties = ParquetProperties.builder()
+        .withPageSize(pageSize)
+        .withDictionaryEncoding(enableDictionary)
+        .withDictionaryPageSize(initialPageBufferSize)
+        .withWriterVersion(writerVersion)
+        .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
+        .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
+        .build();
+    store = new ColumnWriteStoreV1(pageStore, parquetProperties);
     MessageColumnIO columnIO = new 
ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
     setUp(schema, consumer);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index bf75695b6b..01d06444b4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.ByteBufUtil;
 import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
@@ -30,6 +31,7 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
@@ -250,7 +252,7 @@ private DrillBuf readPage(PageHeader pageHeader, int 
compressedSize, int uncompr
   }
 
   public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) 
throws IOException {
-    return BytesInput.from(buf.nioBuffer(offset, length), 0, length);
+    return BytesInput.from(buf.nioBuffer(offset, length));
   }
 
 
@@ -319,41 +321,44 @@ public boolean next() throws IOException {
 
     byteLength = pageHeader.uncompressed_page_size;
 
-    final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, 
pageData.capacity());
+    final ByteBufferInputStream in = 
ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
 
     readPosInBytes = 0;
     if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
       repetitionLevels = 
rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, 
ValuesType.REPETITION_LEVEL);
-      repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) 
readPosInBytes);
+      repetitionLevels.initFromPage(currentPageCount, in);
       // we know that the first value will be a 0, at the end of each list of 
repeated values we will hit another 0 indicating
       // a new record, although we don't know the length until we hit it (and 
this is a one way stream of integers) so we
       // read the first zero here to simplify the reading processes, and start 
reading the first value the same as all
       // of the rest. Effectively we are 'reading' the non-existent value in 
front of the first allowing direct access to
       // the first list of repetition levels
-      readPosInBytes = repetitionLevels.getNextOffset();
+      readPosInBytes = in.position();
       repetitionLevels.readInteger();
     }
-    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
       parentColumnReader.currDefLevel = -1;
       definitionLevels = 
dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, 
ValuesType.DEFINITION_LEVEL);
-      definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) 
readPosInBytes);
-      readPosInBytes = definitionLevels.getNextOffset();
+      definitionLevels.initFromPage(currentPageCount, in);
+      readPosInBytes = in.position();
       if (!valueEncoding.usesDictionary()) {
         valueReader = 
valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, 
ValuesType.VALUES);
-        valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) 
readPosInBytes);
+        valueReader.initFromPage(currentPageCount, in);
       }
     }
-    if (parentColumnReader.columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+    if (valueReader == null && parentColumnReader.columnDescriptor.getType() 
== PrimitiveType.PrimitiveTypeName.BOOLEAN) {
       valueReader = 
valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, 
ValuesType.VALUES);
-      valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) 
readPosInBytes);
+      valueReader.initFromPage(currentPageCount, in);
     }
     if (valueEncoding.usesDictionary()) {
       // initialize two of the dictionary readers, one is for determining the 
lengths of each value, the second is for
       // actually copying the values out into the vectors
+      Preconditions.checkState(readPosInBytes < pageData.capacity());
+      int index = (int)readPosInBytes;
+      ByteBuffer byteBuffer = pageData.nioBuffer(index, pageData.capacity() - 
index);
       dictionaryLengthDeterminingReader = new 
DictionaryValuesReader(dictionary);
-      dictionaryLengthDeterminingReader.initFromPage(currentPageCount, 
pageDataBuffer, (int) readPosInBytes);
+      dictionaryLengthDeterminingReader.initFromPage(currentPageCount, 
ByteBufferInputStream.wrap(byteBuffer));
       dictionaryValueReader = new DictionaryValuesReader(dictionary);
-      dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, 
(int) readPosInBytes);
+      dictionaryValueReader.initFromPage(currentPageCount, 
ByteBufferInputStream.wrap(byteBuffer));
       parentColumnReader.usingDictionary = true;
     } else {
       parentColumnReader.usingDictionary = false;
@@ -445,25 +450,29 @@ public void clear(){
    * @throws IOException An IO related condition
    */
   void resetDefinitionLevelReader(int skipCount) throws IOException {
-    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
-      throw new UnsupportedOperationException("Unsupoorted Operation");
-    }
+    
Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel()
 == 1);
+    Preconditions.checkState(currentPageCount > 0);
 
+    final Encoding rlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
     final Encoding dlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
-    final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, 
pageData.capacity());
-    final int defStartPos = repetitionLevels != null ? 
repetitionLevels.getNextOffset() : 0;
+
+    final ByteBufferInputStream in = 
ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
+
+    if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+      repetitionLevels = 
rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, 
ValuesType.REPETITION_LEVEL);
+      repetitionLevels.initFromPage(currentPageCount, in);
+      repetitionLevels.readInteger();
+    }
+
     definitionLevels = 
dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, 
ValuesType.DEFINITION_LEVEL);
     parentColumnReader.currDefLevel = -1;
 
     // Now reinitialize the underlying decoder
-    assert currentPageCount > 0 : "Page count should be strictly upper than 
zero";
-    definitionLevels.initFromPage(currentPageCount, pageDataBuffer, 
defStartPos);
+    definitionLevels.initFromPage(currentPageCount, in);
 
     // Skip values if requested by caller
     for (int idx = 0; idx < skipCount; ++idx) {
       definitionLevels.skip();
     }
   }
-
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index b6205c1ef3..385cb8369f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -66,12 +66,7 @@
     this.buffer.order(ByteOrder.nativeOrder());
 
     if (pageInfoInput != null) {
-      this.pageInfo.pageData = pageInfoInput.pageData;
-      this.pageInfo.pageDataOff = pageInfoInput.pageDataOff;
-      this.pageInfo.pageDataLen = pageInfoInput.pageDataLen;
-      this.pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
-      this.pageInfo.definitionLevels = pageInfoInput.definitionLevels;
-      this.pageInfo.dictionaryValueReader = 
pageInfoInput.dictionaryValueReader;
+      set(pageInfoInput, false);
     }
 
     this.columnPrecInfo = columnPrecInfoInput;
@@ -87,15 +82,17 @@
     nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, 
pageInfo, columnPrecInfo, entry);
   }
 
-  final void set(PageDataInfo pageInfoInput) {
+  final void set(PageDataInfo pageInfoInput, boolean clear) {
     pageInfo.pageData = pageInfoInput.pageData;
     pageInfo.pageDataOff = pageInfoInput.pageDataOff;
     pageInfo.pageDataLen = pageInfoInput.pageDataLen;
     pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
     pageInfo.definitionLevels = pageInfoInput.definitionLevels;
     pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
-
-    buffer.clear();
+    pageInfo.numPageValues = pageInfoInput.numPageValues;
+    if (clear) {
+      buffer.clear();
+    }
   }
 
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
@@ -160,4 +157,4 @@ private final VarLenColumnBulkEntry getVLEntry(int 
valuesToRead) {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
index 8daf2cc1a5..1b30737597 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
@@ -204,7 +204,7 @@ private final void setBufferedPagePayload() {
         buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, 
callback);
 
       } else {
-        buffPagePayload.set(pageInfo);
+        buffPagePayload.set(pageInfo, true);
       }
     } else {
       if (buffPagePayload == null) {
@@ -567,4 +567,4 @@ private Binary getNextEntry() {
   }
 
 
-}
\ No newline at end of file
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index ab655e9217..a61cc18328 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -39,17 +39,19 @@
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
@@ -87,6 +89,7 @@
   public static final String[] OLD_METADATA_FILENAMES = 
{".drill.parquet_metadata.v2"};
   public static final String METADATA_FILENAME = ".drill.parquet_metadata";
   public static final String METADATA_DIRECTORIES_FILENAME = 
".drill.parquet_metadata_directories";
+  public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = 
"parquet.strings.signed-min-max.enabled";
 
   private final ParquetFormatConfig formatConfig;
 
@@ -409,9 +412,16 @@ private ParquetFileMetadata_v3 
getParquetFileMetadata_v3(ParquetTableMetadata_v3
       final FileStatus file, final FileSystem fs) throws IOException, 
InterruptedException {
     final ParquetMetadata metadata;
     final UserGroupInformation processUserUgi = 
ImpersonationUtil.getProcessUserUGI();
+    final Configuration conf = new Configuration(fs.getConf());
+    final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder()
+        .useSignedStringMinMax(true)
+        .build();
     try {
-      metadata = 
processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)
-          () -> ParquetFileReader.readFooter(fs.getConf(), file, 
ParquetMetadataConverter.NO_FILTER));
+      metadata = 
processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
+        try (ParquetFileReader parquetFileReader = 
ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), 
parquetReadOptions)) {
+          return parquetFileReader.getFooter();
+        }
+      });
     } catch(Exception e) {
       logger.error("Exception while reading footer of parquet file [Details - 
path: {}, owner: {}] as process user {}",
         file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index f208d6e823..1d764b1b6d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -21,7 +21,7 @@
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -179,7 +179,7 @@ private int getNextBlock() throws IOException {
     int nBytes = 0;
     if (bytesToRead > 0) {
       try {
-        nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, 
bytesToRead);
+        nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer);
       } catch (Exception e) {
         logger.error("Error reading from stream {}. Error was : {}", 
this.streamId, e.getMessage());
         throw new IOException((e));
@@ -193,8 +193,8 @@ private int getNextBlock() throws IOException {
           logger.trace(
               "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: 
{}, BufferSize: {}, BytesRead: {}, Count: {}, "
                   + "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", 
this.streamId, this.startOffset,
-              this.totalByteSize, this.bufSize, bytesRead, this.count, 
this.curPosInStream, this.curPosInBuffer, ((double) 
timer.elapsed(TimeUnit.MICROSECONDS))
-                  / 1000);
+              this.totalByteSize, this.bufSize, bytesRead, this.count, 
this.curPosInStream, this.curPosInBuffer,
+              ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
         }
       }
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index ae09a3708c..ea2542eb80 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -23,7 +23,8 @@
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
 
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -86,12 +87,16 @@ public synchronized int read(DrillBuf buf, int off, int 
len) throws IOException
     buf.clear();
     ByteBuffer directBuffer = buf.nioBuffer(0, len);
     int lengthLeftToRead = len;
+    SeekableInputStream seekableInputStream = 
HadoopStreams.wrap(getInputStream());
     while (lengthLeftToRead > 0) {
       if(logger.isTraceEnabled()) {
         logger.trace("PERF: Disk read start. {}, StartOffset: {}, 
TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
       }
       Stopwatch timer = Stopwatch.createStarted();
-      int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, 
lengthLeftToRead);
+      int bytesRead = seekableInputStream.read(directBuffer);
+      if (bytesRead < 0) {
+        return bytesRead;
+      }
       lengthLeftToRead -= bytesRead;
       if(logger.isTraceEnabled()) {
         logger.trace(
@@ -113,7 +118,7 @@ public synchronized DrillBuf getNext(int bytes) throws 
IOException {
       b.release();
       throw e;
     }
-    if (bytesRead <= -1) {
+    if (bytesRead < 0) {
       b.release();
       return null;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
 
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 6e9db7e947..89731ff2a4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -46,7 +46,7 @@
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 import io.netty.buffer.ByteBuf;
 
@@ -163,12 +163,10 @@ public DataPage readPage() {
               ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
               lastPage = buf;
               ByteBuffer buffer = buf.nioBuffer(0, 
pageHeader.compressed_page_size);
-              int lengthLeftToRead = pageHeader.compressed_page_size;
-              while (lengthLeftToRead > 0) {
-                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, 
lengthLeftToRead);
-              }
+              HadoopStreams.wrap(in).readFully(buffer);
+              buffer.flip();
               return new DataPageV1(
-                      decompressor.decompress(BytesInput.from(buffer, 0, 
pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                      decompressor.decompress(BytesInput.from(buffer), 
pageHeader.getUncompressed_page_size()),
                       pageHeader.data_page_header.num_values,
                       pageHeader.uncompressed_page_size,
                       
fromParquetStatistics(pageHeader.data_page_header.statistics, 
columnDescriptor.getType()),
@@ -182,28 +180,33 @@ public DataPage readPage() {
               buf = allocator.buffer(pageHeader.compressed_page_size);
               lastPage = buf;
               buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
-              lengthLeftToRead = pageHeader.compressed_page_size;
-              while (lengthLeftToRead > 0) {
-                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, 
lengthLeftToRead);
-              }
+              HadoopStreams.wrap(in).readFully(buffer);
+              buffer.flip();
               DataPageHeaderV2 dataHeaderV2 = 
pageHeader.getData_page_header_v2();
               int dataSize = compressedPageSize - 
dataHeaderV2.getRepetition_levels_byte_length() - 
dataHeaderV2.getDefinition_levels_byte_length();
               BytesInput decompressedPageData =
                   decompressor.decompress(
-                      BytesInput.from(buffer, 0, 
pageHeader.compressed_page_size),
+                      BytesInput.from(buffer),
                       pageHeader.uncompressed_page_size);
+              ByteBuffer byteBuffer = decompressedPageData.toByteBuffer();
+              int limit = byteBuffer.limit();
+              
byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length());
+              BytesInput repetitionLevels = 
BytesInput.from(byteBuffer.slice());
+              
byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length());
+              byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length() 
+ dataHeaderV2.getDefinition_levels_byte_length());
+              BytesInput definitionLevels = 
BytesInput.from(byteBuffer.slice());
+              
byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length() + 
dataHeaderV2.getDefinition_levels_byte_length());
+              byteBuffer.limit(limit);
+              BytesInput data = BytesInput.from(byteBuffer.slice());
+
               return new DataPageV2(
                       dataHeaderV2.getNum_rows(),
                       dataHeaderV2.getNum_nulls(),
                       dataHeaderV2.getNum_values(),
-                      BytesInput.from(decompressedPageData.toByteBuffer(), 0, 
dataHeaderV2.getRepetition_levels_byte_length()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length(),
-                          dataHeaderV2.getDefinition_levels_byte_length()),
+                      repetitionLevels,
+                      definitionLevels,
                       
parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length() + 
dataHeaderV2.getDefinition_levels_byte_length(),
-                          dataSize),
+                      data,
                       uncompressedPageSize,
                       fromParquetStatistics(dataHeaderV2.getStatistics(), 
columnDescriptor.getType()),
                       dataHeaderV2.isIs_compressed()
diff --git 
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
 
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 93f9920bd9..0ed2245250 100644
--- 
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -17,8 +17,6 @@
  */
 package org.apache.parquet.hadoop;
 
-import static 
org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -119,7 +117,7 @@ private ColumnChunkPageWriter(ColumnDescriptor path,
       this.path = path;
       this.compressor = compressor;
       this.buf = new CapacityByteArrayOutputStream(initialSlabSize, 
maxCapacityHint, allocator);
-      this.totalStatistics = getStatsBasedOnType(this.path.getType());
+      this.totalStatistics = 
Statistics.createStats(this.path.getPrimitiveType());
     }
 
     @Override
@@ -226,11 +224,7 @@ public void writeToFileWriter(ParquetFileWriter writer) 
throws IOException {
         writer.writeDictionaryPage(dictionaryPage);
         // tracking the dictionary encoding is handled in writeDictionaryPage
       }
-      List<Encoding> encodings = Lists.newArrayList();
-      encodings.addAll(rlEncodings);
-      encodings.addAll(dlEncodings);
-      encodings.addAll(dataEncodings);
-      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, 
compressedLength, totalStatistics, encodings);
+      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, 
compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings);
       writer.endColumn();
       logger.debug(
           String.format(
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 50e679ad64..1da2530131 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -30,6 +30,7 @@
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -737,6 +738,7 @@ public void testBooleanPartitionPruning() throws Exception {
     }
   }
 
+  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalDayPartitionPruning() throws Exception {
     final String intervalDayPartitionTable = 
"dfs.tmp.`interval_day_partition`";
@@ -762,6 +764,7 @@ public void testIntervalDayPartitionPruning() throws 
Exception {
     }
   }
 
+  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalYearPartitionPruning() throws Exception {
     final String intervalYearPartitionTable = 
"dfs.tmp.`interval_yr_partition`";
@@ -812,6 +815,7 @@ public void testVarCharWithNullsPartitionPruning() throws 
Exception {
     }
   }
 
+  @Ignore // Statistics for DECIMAL is not available (see PARQUET-1322).
   @Test // DRILL-4139
   public void testDecimalPartitionPruning() throws Exception {
     List<String> ctasQueries = Lists.newArrayList();
diff --git a/pom.xml b/pom.xml
index 6078dc7c10..242b134ddb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,7 @@
     <dep.slf4j.version>1.7.6</dep.slf4j.version>
     <dep.guava.version>18.0</dep.guava.version>
     <forkCount>2</forkCount>
-    <parquet.version>1.8.1-drill-r0</parquet.version>
+    <parquet.version>1.10.0</parquet.version>
     <calcite.version>1.16.0-drill-r3</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <janino.version>2.7.6</janino.version>
@@ -1522,6 +1522,36 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-format</artifactId>
+        <version>2.5.0</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-common</artifactId>
+        <version>${parquet.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -2040,6 +2070,14 @@
                <artifactId>parquet-hadoop</artifactId>
                <version>${parquet.version}</version>
                <exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-client</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-common</artifactId>
+              </exclusion>
                        <exclusion>
                                <groupId>org.xerial.snappy</groupId>
                                <artifactId>snappy-java</artifactId>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Upgrade Parquet MR dependencies
> -------------------------------
>
>                 Key: DRILL-6353
>                 URL: https://issues.apache.org/jira/browse/DRILL-6353
>             Project: Apache Drill
>          Issue Type: Task
>            Reporter: Vlad Rozov
>            Assignee: Vlad Rozov
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>         Attachments: alltypes_optional.json, fixedlenDecimal.json
>
>
> Upgrade from a custom build {{1.8.1-drill-r0}} to Apache release {{1.10.0}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to