This is an automated email from the ASF dual-hosted git repository.

ihuzenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ab65a18237e434742a6c2c546a0411178bf397a5
Author: Bohdan Kazydub <[email protected]>
AuthorDate: Tue Jan 21 15:02:33 2020 +0200

    DRILL-7504: Upgrade Parquet library to 1.11.0
    
    closes #1970
---
 .../ConvertMetadataAggregateToDirectScanRule.java  |  40 +++-
 .../exec/store/parquet/ParquetRecordWriter.java    |   6 +-
 .../store/parquet/ParquetTableMetadataUtils.java   |  10 +-
 .../hadoop/ParquetColumnChunkPageWriteStore.java   | 127 ++++++++-----
 .../store/parquet/TestParquetMetadataCache.java    |   3 -
 .../parquet/TestPushDownAndPruningForDecimal.java  | 210 +++++++++++++--------
 .../parquet/TestPushDownAndPruningForVarchar.java  |   2 -
 pom.xml                                            |   4 +-
 8 files changed, 268 insertions(+), 134 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
index ed4038f..80a463b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.logical;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.IsPredicate;
 import org.apache.drill.exec.metastore.ColumnNamesOptions;
@@ -30,6 +31,9 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.DictColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
@@ -46,6 +50,7 @@ import 
org.apache.drill.metastore.statistics.ColumnStatisticsKind;
 import org.apache.drill.metastore.statistics.ExactStatisticsConstants;
 import org.apache.drill.metastore.statistics.StatisticsKind;
 import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.metastore.util.SchemaPathUtils;
 import org.apache.drill.shaded.guava.com.google.common.collect.HashBasedTable;
 import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.Table;
@@ -202,6 +207,12 @@ public class ConvertMetadataAggregateToDirectScanRule 
extends RelOptRule {
       // populates record list with row group column metadata
       for (SchemaPath schemaPath : interestingColumns) {
         ColumnStatistics<?> columnStatistics = 
rowGroupMetadata.getColumnsStatistics().get(schemaPath);
+
+        // do not gather statistics for array columns as it is not supported 
by Metastore
+        if (containsArrayColumn(rowGroupMetadata.getSchema(), schemaPath)) {
+          return null;
+        }
+
         if (IsPredicate.isNullOrEmpty(columnStatistics)) {
           logger.debug("Statistics for {} column wasn't found within {} row 
group.", schemaPath, path);
           return null;
@@ -215,7 +226,7 @@ public class ConvertMetadataAggregateToDirectScanRule 
extends RelOptRule {
           } else {
             statsValue = columnStatistics.get(statisticsKind);
           }
-          String columnStatisticsFieldName = 
AnalyzeColumnUtils.getColumnStatisticsFieldName(schemaPath.getRootSegmentPath(),
 statisticsKind);
+          String columnStatisticsFieldName = 
AnalyzeColumnUtils.getColumnStatisticsFieldName(schemaPath.toExpr(), 
statisticsKind);
           if (statsValue != null) {
             schema.putIfAbsent(
                 columnStatisticsFieldName,
@@ -268,4 +279,31 @@ public class ConvertMetadataAggregateToDirectScanRule 
extends RelOptRule {
 
     return new DirectGroupScan(reader, scanStats);
   }
+
+  /**
+   * Checks whether schema path contains array segment.
+   *
+   * @param schema tuple schema
+   * @param schemaPath schema path
+   * @return {@code true} if any segment in the schema path is an array, 
{@code false} otherwise
+   */
+  private static boolean containsArrayColumn(TupleMetadata schema, SchemaPath 
schemaPath) {
+    ColumnMetadata columnMetadata = 
SchemaPathUtils.getColumnMetadata(schemaPath, schema);
+    PathSegment currentPath = schemaPath.getRootSegment();
+    ColumnMetadata currentColumn = columnMetadata;
+    do {
+      if (currentColumn.isArray()) {
+        return false;
+      } else if (columnMetadata.isMap()) {
+        currentPath = currentPath.getChild();
+        columnMetadata = 
columnMetadata.tupleSchema().metadata(currentPath.getNameSegment().getPath());
+      } else if (columnMetadata.isDict()) {
+        currentPath = currentPath.getChild();
+        columnMetadata = ((DictColumnMetadata) 
columnMetadata).valueColumnMetadata();
+      } else {
+        return true;
+      }
+    } while (columnMetadata != null);
+    return true;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 9541006..4fd5064 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -255,8 +255,6 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
     int initialSlabSize = 
CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10);
     // TODO: Replace ParquetColumnChunkPageWriteStore with 
ColumnChunkPageWriteStore from parquet library
     // once PARQUET-1006 will be resolved
-    pageStore = new 
ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, 
initialSlabSize,
-        pageSize, new ParquetDirectByteBufferAllocator(oContext));
     ParquetProperties parquetProperties = ParquetProperties.builder()
         .withPageSize(pageSize)
         .withDictionaryEncoding(enableDictionary)
@@ -265,6 +263,10 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
         .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
         .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
         .build();
+    pageStore = new 
ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, 
initialSlabSize,
+        pageSize, parquetProperties.getAllocator(), 
parquetProperties.getPageWriteChecksumEnabled(),
+        parquetProperties.getColumnIndexTruncateLength()
+    );
     store = new ColumnWriteStoreV1(pageStore, parquetProperties);
     MessageColumnIO columnIO = new 
ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index 0bad959..074709d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 import org.joda.time.DateTimeConstants;
 
 import java.math.BigDecimal;
@@ -264,7 +265,7 @@ public class ParquetTableMetadataUtils {
       SchemaPath colPath = SchemaPath.getCompoundPath(column.getName());
 
       Long nulls = column.getNulls();
-      if (!column.isNumNullsSet() || nulls == null) {
+      if (hasInvalidStatistics(column, tableMetadata)) {
         nulls = Statistic.NO_COLUMN_STATS;
       }
       PrimitiveType.PrimitiveTypeName primitiveType = 
getPrimitiveTypeName(tableMetadata, column);
@@ -280,6 +281,13 @@ public class ParquetTableMetadataUtils {
     return columnsStatistics;
   }
 
+  private static boolean hasInvalidStatistics(MetadataBase.ColumnMetadata 
column,
+        MetadataBase.ParquetTableMetadataBase tableMetadata) {
+    return !column.isNumNullsSet() || ((column.getMinValue() == null || 
column.getMaxValue() == null)
+        && column.getNulls() == 0
+        && tableMetadata.getRepetition(column.getName()) == 
Type.Repetition.REQUIRED);
+  }
+
   /**
    * Returns the non-interesting column's metadata
    * @param parquetTableMetadata the source of column metadata for 
non-interesting column's statistics
diff --git 
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
 
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 01a5485..b8f707d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -19,13 +19,14 @@ package org.apache.parquet.hadoop;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.zip.CRC32;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -36,6 +37,8 @@ import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.bytes.ByteBufferAllocator;
@@ -53,17 +56,20 @@ public class ParquetColumnChunkPageWriteStore implements 
PageWriteStore, Closeab
 
   private static ParquetMetadataConverter parquetMetadataConverter = new 
ParquetMetadataConverter();
 
-  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = 
Maps.newHashMap();
+  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new 
HashMap<>();
   private final MessageType schema;
 
   public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
                                           MessageType schema,
                                           int initialSlabSize,
                                           int maxCapacityHint,
-                                          ByteBufferAllocator allocator) {
+                                          ByteBufferAllocator allocator,
+                                          boolean pageWriteChecksumEnabled,
+                                          int columnIndexTruncateLength) {
     this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, 
initialSlabSize, maxCapacityHint, allocator));
+      writers.put(path, new ColumnChunkPageWriter(path, compressor, 
initialSlabSize,
+          maxCapacityHint, allocator, pageWriteChecksumEnabled, 
columnIndexTruncateLength));
     }
   }
 
@@ -105,37 +111,54 @@ public class ParquetColumnChunkPageWriteStore implements 
PageWriteStore, Closeab
     private int pageCount;
 
     // repetition and definition level encodings are used only for v1 pages 
and don't change
-    private Set<Encoding> rlEncodings = Sets.newHashSet();
-    private Set<Encoding> dlEncodings = Sets.newHashSet();
-    private List<Encoding> dataEncodings = Lists.newArrayList();
+    private Set<Encoding> rlEncodings = new HashSet<>();
+    private Set<Encoding> dlEncodings = new HashSet<>();
+    private List<Encoding> dataEncodings = new ArrayList<>();
 
+    private ColumnIndexBuilder columnIndexBuilder;
+    private OffsetIndexBuilder offsetIndexBuilder;
     private Statistics totalStatistics;
 
+    private final CRC32 crc;
+    boolean pageWriteChecksumEnabled;
+
     private ColumnChunkPageWriter(ColumnDescriptor path,
                                   BytesCompressor compressor,
                                   int initialSlabSize,
                                   int maxCapacityHint,
-                                  ByteBufferAllocator allocator) {
+                                  ByteBufferAllocator allocator,
+                                  boolean pageWriteChecksumEnabled,
+                                  int columnIndexTruncateLength) {
       this.path = path;
       this.compressor = compressor;
       this.buf = new CapacityByteArrayOutputStream(initialSlabSize, 
maxCapacityHint, allocator);
       this.totalStatistics = 
Statistics.createStats(this.path.getPrimitiveType());
+      this.columnIndexBuilder = 
ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), 
columnIndexTruncateLength);
+      this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+      this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
+      this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+    }
+
+    @Override
+    public void writePage(BytesInput bytesInput, int valueCount, Statistics<?> 
statistics, Encoding rlEncoding,
+          Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+      // Setting the builders to the no-op ones so no column/offset indexes 
will be written for this column chunk
+      columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+      offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+
+      writePage(bytesInput, valueCount, -1, statistics, rlEncoding, 
dlEncoding, valuesEncoding);
     }
 
     @Override
-    public void writePage(BytesInput bytes,
-                          int valueCount,
-                          Statistics statistics,
-                          Encoding rlEncoding,
-                          Encoding dlEncoding,
-                          Encoding valuesEncoding) throws IOException {
+    public void writePage(BytesInput bytes, int valueCount, int rowCount, 
Statistics statistics,
+                          Encoding rlEncoding, Encoding dlEncoding, Encoding 
valuesEncoding) throws IOException {
       long uncompressedSize = bytes.size();
-      // Parquet library creates bad metadata if the uncompressed or 
compressed size of a page exceeds Integer.MAX_VALUE
       if (uncompressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
             "Cannot write page larger than Integer.MAX_VALUE bytes: " +
                 uncompressedSize);
       }
+
       BytesInput compressedBytes = compressor.compress(bytes);
       long compressedSize = compressedBytes.size();
       if (compressedSize > Integer.MAX_VALUE) {
@@ -143,26 +166,43 @@ public class ParquetColumnChunkPageWriteStore implements 
PageWriteStore, Closeab
             "Cannot write compressed page larger than Integer.MAX_VALUE bytes: 
"
                 + compressedSize);
       }
-      parquetMetadataConverter.writeDataPageHeader(
-          (int)uncompressedSize,
-          (int)compressedSize,
-          valueCount,
-          statistics,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          buf);
+
+      if (pageWriteChecksumEnabled) {
+        crc.reset();
+        crc.update(compressedBytes.toByteArray());
+        parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, 
(int) compressedSize,
+            valueCount, rlEncoding, dlEncoding, valuesEncoding, (int) 
crc.getValue(), buf);
+      } else {
+        parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, 
(int) compressedSize,
+            valueCount, rlEncoding, dlEncoding, valuesEncoding, buf);
+      }
+
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
+
+      addStatistics(statistics);
+
+      offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), 
rowCount);
+
       compressedBytes.writeAllTo(buf);
       rlEncodings.add(rlEncoding);
       dlEncodings.add(dlEncoding);
       dataEncodings.add(valuesEncoding);
     }
 
+    private void addStatistics(Statistics statistics) {
+      // Copying the statistics if it is not initialized yet so we have the 
correct typed one
+      if (totalStatistics == null) {
+        totalStatistics = statistics.copy();
+      } else {
+        totalStatistics.mergeStatistics(statistics);
+      }
+
+      columnIndexBuilder.add(statistics);
+    }
+
     @Override
     public void writePageV2(int rowCount,
                             int nullCount,
@@ -193,8 +233,12 @@ public class ParquetColumnChunkPageWriteStore implements 
PageWriteStore, Closeab
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
 
+      addStatistics(statistics);
+
+      offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), 
rowCount);
+
+      repetitionLevels.writeAllTo(buf);
       definitionLevels.writeAllTo(buf);
       compressedData.writeAllTo(buf);
 
@@ -221,21 +265,20 @@ public class ParquetColumnChunkPageWriteStore implements 
PageWriteStore, Closeab
      * @throws IOException if the file can not be created
      */
     public void writeToFileWriter(ParquetFileWriter writer) throws IOException 
{
-      writer.startColumn(path, totalValueCount, compressor.getCodecName());
-      if (dictionaryPage != null) {
-        writer.writeDictionaryPage(dictionaryPage);
-        // tracking the dictionary encoding is handled in writeDictionaryPage
+      writer.writeColumnChunk(path, totalValueCount, compressor.getCodecName(),
+          dictionaryPage, BytesInput.from(buf), uncompressedLength, 
compressedLength, totalStatistics,
+          columnIndexBuilder, offsetIndexBuilder, rlEncodings, dlEncodings, 
dataEncodings);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            String.format(
+                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d 
pages, encodings: %s",
+                buf.size(), path, totalValueCount, uncompressedLength, 
compressedLength, pageCount, new HashSet<>(dataEncodings))
+                + (dictionaryPage != null ? String.format(
+                ", dic { %,d entries, %,dB raw, %,dB comp}",
+                dictionaryPage.getDictionarySize(), 
dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+                : "")
+        );
       }
-      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, 
compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings);
-      writer.endColumn();
-      logger.debug(
-          String.format(
-              "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, 
encodings: %s",
-              buf.size(), path, totalValueCount, uncompressedLength, 
compressedLength, pageCount, Sets.newHashSet(dataEncodings))
-              + (dictionaryPage != null ? String.format(
-              ", dic { %,d entries, %,dB raw, %,dB comp}",
-              dictionaryPage.getDictionarySize(), 
dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
-              : ""));
       rlEncodings.clear();
       dlEncodings.clear();
       dataEncodings.clear();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 5ad5a83..e7e9c0c 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -30,7 +30,6 @@ import 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
 import org.apache.drill.test.TestBuilder;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -743,7 +742,6 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
-  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalDayPartitionPruning() throws Exception {
     final String intervalDayPartitionTable = 
"dfs.tmp.`interval_day_partition`";
@@ -769,7 +767,6 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
-  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalYearPartitionPruning() throws Exception {
     final String intervalYearPartitionTable = 
"dfs.tmp.`interval_yr_partition`";
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
index 6f1f676..36c5670 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.exec.ExecConstants;
@@ -27,7 +26,6 @@ import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.junit.After;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -43,9 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -126,10 +121,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       for (String table : Arrays.asList(oldTable, newTable)) {
         String query = String.format("select val_int_32, val_int_64 from %s 
where %s = cast(1.00 as decimal(5, 2))", table, column);
 
-        String plan = client.queryBuilder().sql(query).explainText();
-        assertThat(plan, containsString("numRowGroups=1"));
-        assertThat(plan, containsString("usedMetadataFile=false"));
-        assertThat(plan, not(containsString("Filter")));
+        queryBuilder()
+            .sql(query)
+            .planMatcher()
+            .include("numRowGroups=1", "usedMetadataFile=false")
+            .exclude("Filter")
+            .match();
 
         client.testBuilder()
           .sqlQuery(query)
@@ -160,10 +157,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       for (String column : Arrays.asList("part_int_32", "part_int_64")) {
         String query = String.format("select val_int_32, val_int_64 from %s 
where %s = cast(2.00 as decimal(5,2))", table, column);
 
-        String plan = client.queryBuilder().sql(query).explainText();
-        assertThat(plan, containsString("numRowGroups=1"));
-        assertThat(plan, containsString("usedMetadataFile=true"));
-        assertThat(plan, not(containsString("Filter")));
+        queryBuilder()
+            .sql(query)
+            .planMatcher()
+            .include("numRowGroups=1", "usedMetadataFile=true")
+            .exclude("Filter")
+            .match();
 
         client.testBuilder()
           .sqlQuery(query)
@@ -188,10 +187,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       String query = String.format("select val_int_32, val_int_64 from %s 
where %s = cast(1.05 as decimal(5, 2))",
         table, column);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      // push down does not work for old int decimal types because stats is 
not read: PARQUET-1322
-      assertThat(plan, containsString("numRowGroups=2"));
-      assertThat(plan, containsString("usedMetadataFile=false"));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=2", "usedMetadataFile=false")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -213,9 +213,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       String query = String.format("select val_int_32, val_int_64 from %s 
where %s = cast(1.05 as decimal(5, 2))",
         table, column);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString("numRowGroups=1"));
-      assertThat(plan, containsString("usedMetadataFile=true"));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=1", "usedMetadataFile=true")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -239,10 +241,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       String query = String.format("select val_int_32, val_int_64 from %s 
where %s = cast(20.25 as decimal(5, 2))",
         table, column);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      // push down does not work for old int decimal types because stats is 
not read: PARQUET-1322
-      assertThat(plan, containsString("numRowGroups=2"));
-      assertThat(plan, containsString("usedMetadataFile=true"));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=2", "usedMetadataFile=true")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -269,9 +272,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       String query = String.format("select val_int_32, val_int_64 from %s 
where %s = cast(20.25 as decimal(5, 2))",
         table, column);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString("numRowGroups=1"));
-      assertThat(plan, containsString("usedMetadataFile=false"));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=1", "usedMetadataFile=false")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -299,9 +304,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       String query = String.format("select val_int_32, val_int_64 from %s 
where %s = cast(20.0 as decimal(5, 2))",
         table, column);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString("numRowGroups=1"));
-      assertThat(plan, containsString("usedMetadataFile=true"));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=1", "usedMetadataFile=true")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -331,10 +338,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
         
client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
optionValue);
         String query = String.format("select part_fixed, val_fixed from %s 
where part_fixed = cast(1.00 as decimal(5, 2))", table);
 
-        String plan = client.queryBuilder().sql(query).explainText();
-        assertThat(plan, containsString("numRowGroups=1"));
-        assertThat(plan, containsString("usedMetadataFile=false"));
-        assertThat(plan, not(containsString("Filter")));
+        queryBuilder()
+            .sql(query)
+            .planMatcher()
+            .include("numRowGroups=1", "usedMetadataFile=false")
+            .exclude("Filter")
+            .match();
 
         client.testBuilder()
           .sqlQuery(query)
@@ -359,10 +368,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
optionValue);
       String query = String.format("select part_fixed, val_fixed from %s where 
part_fixed = cast(1.00 as decimal(5, 2))", table);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString("numRowGroups=1"));
-      assertThat(plan, containsString("usedMetadataFile=true"));
-      assertThat(plan, not(containsString("Filter")));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=1", "usedMetadataFile=true")
+          .exclude("Filter")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -387,10 +398,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
optionValue);
       String query = String.format("select part_fixed, val_fixed from %s where 
part_fixed = cast(2.00 as decimal(5, 2))", table);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString("numRowGroups=1"));
-      assertThat(plan, containsString("usedMetadataFile=true"));
-      assertThat(plan, not(containsString("Filter")));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=1", "usedMetadataFile=true")
+          .exclude("Filter")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -411,10 +424,13 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
   public void testOldFixedDecimalPushDownNoMeta() throws Exception {
     String table = createTable("old_fixed_decimal_push_down_no_meta", true);
     String query = String.format("select part_fixed, val_fixed from %s where 
val_fixed = cast(1.05 as decimal(5, 2))", table);
-    String plan = client.queryBuilder().sql(query).explainText();
+
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
     // statistics for fixed decimal is not available for files generated prior 
to parquet 1.10.0 version
-    assertThat(plan, containsString("numRowGroups=2"));
-    assertThat(plan, containsString("usedMetadataFile=false"));
+        .include("numRowGroups=2", "usedMetadataFile=false")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -439,9 +455,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
 
     for (Map.Entry<String, String> property : properties.entrySet()) {
       client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
property.getKey());
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString(property.getValue()));
-      assertThat(plan, containsString("usedMetadataFile=true"));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include(property.getValue(), "usedMetadataFile=true")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -463,9 +481,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
 
     queryBuilder().sql(String.format("refresh table metadata %s", 
table)).run();
 
-    String plan = client.queryBuilder().sql(query).explainText();
-    assertThat(plan, containsString("numRowGroups=2"));
-    assertThat(plan, containsString("usedMetadataFile=true"));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("numRowGroups=2", "usedMetadataFile=true")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -491,9 +511,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       newTable, dataTable)).run();
 
     String query = String.format("select part_fixed, val_fixed from %s where 
val_fixed = cast(1.05 as decimal(5, 2))", newTable);
-    String plan = client.queryBuilder().sql(query).explainText();
-    assertThat(plan, containsString("numRowGroups=1"));
-    assertThat(plan, containsString("usedMetadataFile=false"));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("numRowGroups=1", "usedMetadataFile=false")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -507,9 +529,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
     // metadata for binary is allowed only after Drill 1.15.0
     // set string signed option to true since test was written on Drill 
1.15.0-SNAPSHOT version
     client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
"true");
-    plan = client.queryBuilder().sql(query).explainText();
-    assertThat(plan, containsString("numRowGroups=1"));
-    assertThat(plan, containsString("usedMetadataFile=true"));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("numRowGroups=1", "usedMetadataFile=true")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -535,10 +559,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       + "as select part_int_32 as part_binary, val_int_32 as val_binary from 
%s", newTable, dataTable)).run();
 
     String query = String.format("select part_binary, val_binary from %s where 
part_binary = cast(1.00 as decimal(5, 2))", newTable);
-    String plan = client.queryBuilder().sql(query).explainText();
-    assertThat(plan, containsString("numRowGroups=1"));
-    assertThat(plan, containsString("usedMetadataFile=false"));
-    assertThat(plan, not(containsString("Filter")));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("numRowGroups=1", "usedMetadataFile=false")
+        .exclude("Filter")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -551,10 +577,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
 
     queryBuilder().sql(String.format("refresh table metadata %s", 
newTable)).run();
 
-    plan = client.queryBuilder().sql(query).explainText();
-    assertThat(plan, containsString("numRowGroups=1"));
-    assertThat(plan, containsString("usedMetadataFile=true"));
-    assertThat(plan, not(containsString("Filter")));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("numRowGroups=1", "usedMetadataFile=true")
+        .exclude("Filter")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -582,9 +610,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
       + "as select part_int_32 as part_binary, val_int_32 as val_binary from 
%s", newTable, dataTable)).run();
 
     String query = String.format("select part_binary, val_binary from %s where 
val_binary = cast(1.05 as decimal(5, 2))", newTable);
-    String plan = client.queryBuilder().sql(query).explainText();
-    assertThat(plan, containsString("numRowGroups=1"));
-    assertThat(plan, containsString("usedMetadataFile=false"));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("numRowGroups=1", "usedMetadataFile=false")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -598,9 +628,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
     // metadata for binary is allowed only after Drill 1.15.0
     // set string signed option to true, since test was written on Drill 
1.15.0-SNAPSHOT version
     client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, 
"true");
-    plan = client.queryBuilder().sql(query).explainText();
-    assertThat(plan, containsString("numRowGroups=1"));
-    assertThat(plan, containsString("usedMetadataFile=true"));
+    queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include("numRowGroups=1", "usedMetadataFile=true")
+        .match();
 
     client.testBuilder()
       .sqlQuery(query)
@@ -624,10 +656,12 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
     for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", 
"decimal(5, 1)")) {
       String query = String.format("select part, val from %s where part = 
cast(2.0 as %s)", newTable, decimalType);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString("numRowGroups=1"));
-      assertThat(plan, containsString("usedMetadataFile=false"));
-      assertThat(plan, not(containsString("Filter")));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=1", "usedMetadataFile=false")
+          .exclude("Filter")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -654,9 +688,11 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
     for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", 
"decimal(5, 1)")) {
       String query = String.format("select part, val from %s where val = 
cast(20.0 as %s)", newTable, decimalType);
 
-      String plan = client.queryBuilder().sql(query).explainText();
-      assertThat(plan, containsString("numRowGroups=1"));
-      assertThat(plan, containsString("usedMetadataFile=false"));
+      queryBuilder()
+          .sql(query)
+          .planMatcher()
+          .include("numRowGroups=1", "usedMetadataFile=false")
+          .match();
 
       client.testBuilder()
         .sqlQuery(query)
@@ -667,7 +703,6 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
     }
   }
 
-  @Ignore("Statistics for DECIMAL that has all nulls is not available 
(PARQUET-1341). Requires upgrade to Parquet 1.11.0")
   @Test
   public void testDecimalPruningWithNullPartition() throws Exception {
     List<String> ctasQueries = new ArrayList<>();
@@ -692,13 +727,26 @@ public class TestPushDownAndPruningForDecimal extends 
ClusterTest {
 
         long actualRowCount = 
client.queryBuilder().sql(query).run().recordCount();
         assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
-        PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+        queryBuilder()
+            .sql(query)
+            .planMatcher()
+            .include("usedMetadataFile=false")
+            .exclude("Filter")
+            .match();
 
         queryBuilder().sql(String.format("refresh table metadata %s", 
decimalPartitionTable)).run();
 
         actualRowCount = client.queryBuilder().sql(query).run().recordCount();
         assertEquals("Row count does not match the expected value", 
expectedRowCount, actualRowCount);
-        PlanTestBase.testPlanMatchingPatterns(query, new 
String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+
+
+        queryBuilder()
+            .sql(query)
+            .planMatcher()
+            .include("usedMetadataFile=true")
+            .exclude("Filter")
+            .match();
       } finally {
         client.runSqlSilently(String.format("drop table if exists %s", 
decimalPartitionTable));
       }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
index bd93cd3..a8a9364 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
@@ -26,7 +26,6 @@ import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -196,7 +195,6 @@ public class TestPushDownAndPruningForVarchar extends 
ClusterTest {
     }
   }
 
-  @Ignore("Statistics for VARCHAR that has all nulls is not available 
(PARQUET-1341). Requires upgrade to Parquet 1.11.0.")
   @Test
   public void testNewFilesPruningWithNullPartition() throws Exception {
     String table = "dfs.`tmp`.`varchar_pruning_new_with_null_partition`";
diff --git a/pom.xml b/pom.xml
index f26c218..359f059 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
     <shaded.guava.version>23.0</shaded.guava.version>
     <guava.version>19.0</guava.version>
     <forkCount>2</forkCount>
-    <parquet.version>1.10.0</parquet.version>
+    <parquet.version>1.11.0</parquet.version>
     <!--
       For development purposes to be able to use custom Calcite versions (e.g. 
not present in jitpack
       repository or from local repository) update this property to desired 
value (e.g. org.apache.calcite).
@@ -1683,7 +1683,7 @@
       <dependency>
         <groupId>org.apache.parquet</groupId>
         <artifactId>parquet-format</artifactId>
-        <version>2.5.0</version>
+        <version>2.8.0</version>
         <exclusions>
           <exclusion>
             <groupId>org.apache.hadoop</groupId>

Reply via email to