This is an automated email from the ASF dual-hosted git repository. ihuzenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit ab65a18237e434742a6c2c546a0411178bf397a5 Author: Bohdan Kazydub <[email protected]> AuthorDate: Tue Jan 21 15:02:33 2020 +0200 DRILL-7504: Upgrade Parquet library to 1.11.0 closes #1970 --- .../ConvertMetadataAggregateToDirectScanRule.java | 40 +++- .../exec/store/parquet/ParquetRecordWriter.java | 6 +- .../store/parquet/ParquetTableMetadataUtils.java | 10 +- .../hadoop/ParquetColumnChunkPageWriteStore.java | 127 ++++++++----- .../store/parquet/TestParquetMetadataCache.java | 3 - .../parquet/TestPushDownAndPruningForDecimal.java | 210 +++++++++++++-------- .../parquet/TestPushDownAndPruningForVarchar.java | 2 - pom.xml | 4 +- 8 files changed, 268 insertions(+), 134 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java index ed4038f..80a463b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.logical; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelNode; +import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.expr.IsPredicate; import org.apache.drill.exec.metastore.ColumnNamesOptions; @@ -30,6 +31,9 @@ import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.DictColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns; import org.apache.drill.exec.store.dfs.DrillFileSystem; @@ -46,6 +50,7 @@ import org.apache.drill.metastore.statistics.ColumnStatisticsKind; import org.apache.drill.metastore.statistics.ExactStatisticsConstants; import org.apache.drill.metastore.statistics.StatisticsKind; import org.apache.drill.metastore.statistics.TableStatisticsKind; +import org.apache.drill.metastore.util.SchemaPathUtils; import org.apache.drill.shaded.guava.com.google.common.collect.HashBasedTable; import org.apache.drill.shaded.guava.com.google.common.collect.Multimap; import org.apache.drill.shaded.guava.com.google.common.collect.Table; @@ -202,6 +207,12 @@ public class ConvertMetadataAggregateToDirectScanRule extends RelOptRule { // populates record list with row group column metadata for (SchemaPath schemaPath : interestingColumns) { ColumnStatistics<?> columnStatistics = rowGroupMetadata.getColumnsStatistics().get(schemaPath); + + // do not gather statistics for array columns as it is not supported by Metastore + if (containsArrayColumn(rowGroupMetadata.getSchema(), schemaPath)) { + return null; + } + if (IsPredicate.isNullOrEmpty(columnStatistics)) { logger.debug("Statistics for {} column wasn't found within {} row group.", schemaPath, path); return null; @@ -215,7 +226,7 @@ public class ConvertMetadataAggregateToDirectScanRule extends RelOptRule { } else { statsValue = columnStatistics.get(statisticsKind); } - String columnStatisticsFieldName = AnalyzeColumnUtils.getColumnStatisticsFieldName(schemaPath.getRootSegmentPath(), statisticsKind); + String columnStatisticsFieldName = AnalyzeColumnUtils.getColumnStatisticsFieldName(schemaPath.toExpr(), statisticsKind); if (statsValue != null) { schema.putIfAbsent( columnStatisticsFieldName, @@ -268,4 +279,31 @@ public class ConvertMetadataAggregateToDirectScanRule extends RelOptRule { return new DirectGroupScan(reader, scanStats); } + + /** + * Checks whether schema path contains array segment. + * + * @param schema tuple schema + * @param schemaPath schema path + * @return {@code true} if any segment in the schema path is an array, {@code false} otherwise + */ + private static boolean containsArrayColumn(TupleMetadata schema, SchemaPath schemaPath) { + ColumnMetadata columnMetadata = SchemaPathUtils.getColumnMetadata(schemaPath, schema); + PathSegment currentPath = schemaPath.getRootSegment(); + ColumnMetadata currentColumn = columnMetadata; + do { + if (currentColumn.isArray()) { + return false; + } else if (columnMetadata.isMap()) { + currentPath = currentPath.getChild(); + columnMetadata = columnMetadata.tupleSchema().metadata(currentPath.getNameSegment().getPath()); + } else if (columnMetadata.isDict()) { + currentPath = currentPath.getChild(); + columnMetadata = ((DictColumnMetadata) columnMetadata).valueColumnMetadata(); + } else { + return true; + } + } while (columnMetadata != null); + return true; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 9541006..4fd5064 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -255,8 +255,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10); // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library // once PARQUET-1006 will be resolved - pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize, - pageSize, new ParquetDirectByteBufferAllocator(oContext)); ParquetProperties parquetProperties = ParquetProperties.builder() .withPageSize(pageSize) .withDictionaryEncoding(enableDictionary) @@ -265,6 +263,10 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { .withAllocator(new ParquetDirectByteBufferAllocator(oContext)) .withValuesWriterFactory(new DefaultV1ValuesWriterFactory()) .build(); + pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize, + pageSize, parquetProperties.getAllocator(), parquetProperties.getPageWriteChecksumEnabled(), + parquetProperties.getColumnIndexTruncateLength() + ); store = new ColumnWriteStoreV1(pageStore, parquetProperties); MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema); consumer = columnIO.getRecordWriter(store); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java index 0bad959..074709d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java @@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import org.joda.time.DateTimeConstants; import java.math.BigDecimal; @@ -264,7 +265,7 @@ public class ParquetTableMetadataUtils { SchemaPath colPath = SchemaPath.getCompoundPath(column.getName()); Long nulls = column.getNulls(); - if (!column.isNumNullsSet() || nulls == null) { + if (hasInvalidStatistics(column, tableMetadata)) { nulls = Statistic.NO_COLUMN_STATS; } PrimitiveType.PrimitiveTypeName primitiveType = getPrimitiveTypeName(tableMetadata, column); @@ -280,6 +281,13 @@ public class ParquetTableMetadataUtils { return columnsStatistics; } + private static boolean hasInvalidStatistics(MetadataBase.ColumnMetadata column, + MetadataBase.ParquetTableMetadataBase tableMetadata) { + return !column.isNumNullsSet() || ((column.getMinValue() == null || column.getMaxValue() == null) + && column.getNulls() == 0 + && tableMetadata.getRepetition(column.getName()) == Type.Repetition.REQUIRED); + } + /** * Returns the non-interesting column's metadata * @param parquetTableMetadata the source of column metadata for non-interesting column's statistics diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java index 01a5485..b8f707d 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java @@ -19,13 +19,14 @@ package org.apache.parquet.hadoop; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.zip.CRC32; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; -import org.apache.drill.shaded.guava.com.google.common.collect.Sets; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.column.ColumnDescriptor; @@ -36,6 +37,8 @@ import org.apache.parquet.column.page.PageWriter; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; +import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; +import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.bytes.ByteBufferAllocator; @@ -53,17 +56,20 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = Maps.newHashMap(); + private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<>(); private final MessageType schema; public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize, int maxCapacityHint, - ByteBufferAllocator allocator) { + ByteBufferAllocator allocator, + boolean pageWriteChecksumEnabled, + int columnIndexTruncateLength) { this.schema = schema; for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, + maxCapacityHint, allocator, pageWriteChecksumEnabled, columnIndexTruncateLength)); } } @@ -105,37 +111,54 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab private int pageCount; // repetition and definition level encodings are used only for v1 pages and don't change - private Set<Encoding> rlEncodings = Sets.newHashSet(); - private Set<Encoding> dlEncodings = Sets.newHashSet(); - private List<Encoding> dataEncodings = Lists.newArrayList(); + private Set<Encoding> rlEncodings = new HashSet<>(); + private Set<Encoding> dlEncodings = new HashSet<>(); + private List<Encoding> dataEncodings = new ArrayList<>(); + private ColumnIndexBuilder columnIndexBuilder; + private OffsetIndexBuilder offsetIndexBuilder; private Statistics totalStatistics; + private final CRC32 crc; + boolean pageWriteChecksumEnabled; + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSlabSize, int maxCapacityHint, - ByteBufferAllocator allocator) { + ByteBufferAllocator allocator, + boolean pageWriteChecksumEnabled, + int columnIndexTruncateLength) { this.path = path; this.compressor = compressor; this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator); this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType()); + this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); + this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; + } + + @Override + public void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding, + Encoding dlEncoding, Encoding valuesEncoding) throws IOException { + // Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); + + writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding); } @Override - public void writePage(BytesInput bytes, - int valueCount, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { + public void writePage(BytesInput bytes, int valueCount, int rowCount, Statistics statistics, + Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException { long uncompressedSize = bytes.size(); - // Parquet library creates bad metadata if the uncompressed or compressed size of a page exceeds Integer.MAX_VALUE if (uncompressedSize > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write page larger than Integer.MAX_VALUE bytes: " + uncompressedSize); } + BytesInput compressedBytes = compressor.compress(bytes); long compressedSize = compressedBytes.size(); if (compressedSize > Integer.MAX_VALUE) { @@ -143,26 +166,43 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " + compressedSize); } - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); + + if (pageWriteChecksumEnabled) { + crc.reset(); + crc.update(compressedBytes.toByteArray()); + parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize, + valueCount, rlEncoding, dlEncoding, valuesEncoding, (int) crc.getValue(), buf); + } else { + parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize, + valueCount, rlEncoding, dlEncoding, valuesEncoding, buf); + } + this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; - this.totalStatistics.mergeStatistics(statistics); + + addStatistics(statistics); + + offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount); + compressedBytes.writeAllTo(buf); rlEncodings.add(rlEncoding); dlEncodings.add(dlEncoding); dataEncodings.add(valuesEncoding); } + private void addStatistics(Statistics statistics) { + // Copying the statistics if it is not initialized yet so we have the correct typed one + if (totalStatistics == null) { + totalStatistics = statistics.copy(); + } else { + totalStatistics.mergeStatistics(statistics); + } + + columnIndexBuilder.add(statistics); + } + @Override public void writePageV2(int rowCount, int nullCount, @@ -193,8 +233,12 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab this.compressedLength += compressedSize; this.totalValueCount += valueCount; this.pageCount += 1; - this.totalStatistics.mergeStatistics(statistics); + addStatistics(statistics); + + offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount); + + repetitionLevels.writeAllTo(buf); definitionLevels.writeAllTo(buf); compressedData.writeAllTo(buf); @@ -221,21 +265,20 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab * @throws IOException if the file can not be created */ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { - writer.startColumn(path, totalValueCount, compressor.getCodecName()); - if (dictionaryPage != null) { - writer.writeDictionaryPage(dictionaryPage); - // tracking the dictionary encoding is handled in writeDictionaryPage + writer.writeColumnChunk(path, totalValueCount, compressor.getCodecName(), + dictionaryPage, BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, + columnIndexBuilder, offsetIndexBuilder, rlEncodings, dlEncodings, dataEncodings); + if (logger.isDebugEnabled()) { + logger.debug( + String.format( + "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", + buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet<>(dataEncodings)) + + (dictionaryPage != null ? String.format( + ", dic { %,d entries, %,dB raw, %,dB comp}", + dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) + : "") + ); } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings); - writer.endColumn(); - logger.debug( - String.format( - "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, Sets.newHashSet(dataEncodings)) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); rlEncodings.clear(); dlEncodings.clear(); dataEncodings.clear(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java index 5ad5a83..e7e9c0c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java @@ -30,7 +30,6 @@ import org.apache.drill.exec.store.parquet.metadata.MetadataVersion; import org.apache.drill.test.TestBuilder; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -743,7 +742,6 @@ public class TestParquetMetadataCache extends PlanTestBase { } } - @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064) @Test // DRILL-4139 public void testIntervalDayPartitionPruning() throws Exception { final String intervalDayPartitionTable = "dfs.tmp.`interval_day_partition`"; @@ -769,7 +767,6 @@ public class TestParquetMetadataCache extends PlanTestBase { } } - @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064) @Test // DRILL-4139 public void testIntervalYearPartitionPruning() throws Exception { final String intervalYearPartitionTable = "dfs.tmp.`interval_yr_partition`"; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java index 6f1f676..36c5670 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.parquet; import org.apache.commons.io.FileUtils; -import org.apache.drill.PlanTestBase; import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.UnlikelyTest; import org.apache.drill.exec.ExecConstants; @@ -27,7 +26,6 @@ import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.ClusterTest; import org.junit.After; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -43,9 +41,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -126,10 +121,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { for (String table : Arrays.asList(oldTable, newTable)) { String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.00 as decimal(5, 2))", table, column); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -160,10 +157,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { for (String column : Arrays.asList("part_int_32", "part_int_64")) { String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(2.00 as decimal(5,2))", table, column); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -188,10 +187,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))", table, column); - String plan = client.queryBuilder().sql(query).explainText(); - // push down does not work for old int decimal types because stats is not read: PARQUET-1322 - assertThat(plan, containsString("numRowGroups=2")); - assertThat(plan, containsString("usedMetadataFile=false")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=2", "usedMetadataFile=false") + .match(); client.testBuilder() .sqlQuery(query) @@ -213,9 +213,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))", table, column); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .match(); client.testBuilder() .sqlQuery(query) @@ -239,10 +241,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))", table, column); - String plan = client.queryBuilder().sql(query).explainText(); - // push down does not work for old int decimal types because stats is not read: PARQUET-1322 - assertThat(plan, containsString("numRowGroups=2")); - assertThat(plan, containsString("usedMetadataFile=true")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=2", "usedMetadataFile=true") + .match(); client.testBuilder() .sqlQuery(query) @@ -269,9 +272,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))", table, column); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .match(); client.testBuilder() .sqlQuery(query) @@ -299,9 +304,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.0 as decimal(5, 2))", table, column); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .match(); client.testBuilder() .sqlQuery(query) @@ -331,10 +338,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue); String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -359,10 +368,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue); String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -387,10 +398,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue); String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(2.00 as decimal(5, 2))", table); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -411,10 +424,13 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { public void testOldFixedDecimalPushDownNoMeta() throws Exception { String table = createTable("old_fixed_decimal_push_down_no_meta", true); String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", table); - String plan = client.queryBuilder().sql(query).explainText(); + + queryBuilder() + .sql(query) + .planMatcher() // statistics for fixed decimal is not available for files generated prior to parquet 1.10.0 version - assertThat(plan, containsString("numRowGroups=2")); - assertThat(plan, containsString("usedMetadataFile=false")); + .include("numRowGroups=2", "usedMetadataFile=false") + .match(); client.testBuilder() .sqlQuery(query) @@ -439,9 +455,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { for (Map.Entry<String, String> property : properties.entrySet()) { client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey()); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString(property.getValue())); - assertThat(plan, containsString("usedMetadataFile=true")); + queryBuilder() + .sql(query) + .planMatcher() + .include(property.getValue(), "usedMetadataFile=true") + .match(); client.testBuilder() .sqlQuery(query) @@ -463,9 +481,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { queryBuilder().sql(String.format("refresh table metadata %s", table)).run(); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=2")); - assertThat(plan, containsString("usedMetadataFile=true")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=2", "usedMetadataFile=true") + .match(); client.testBuilder() .sqlQuery(query) @@ -491,9 +511,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { newTable, dataTable)).run(); String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", newTable); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .match(); client.testBuilder() .sqlQuery(query) @@ -507,9 +529,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { // metadata for binary is allowed only after Drill 1.15.0 // set string signed option to true since test was written on Drill 1.15.0-SNAPSHOT version client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true"); - plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .match(); client.testBuilder() .sqlQuery(query) @@ -535,10 +559,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run(); String query = String.format("select part_binary, val_binary from %s where part_binary = cast(1.00 as decimal(5, 2))", newTable); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -551,10 +577,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run(); - plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -582,9 +610,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run(); String query = String.format("select part_binary, val_binary from %s where val_binary = cast(1.05 as decimal(5, 2))", newTable); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .match(); client.testBuilder() .sqlQuery(query) @@ -598,9 +628,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { // metadata for binary is allowed only after Drill 1.15.0 // set string signed option to true, since test was written on Drill 1.15.0-SNAPSHOT version client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true"); - plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=true")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=true") + .match(); client.testBuilder() .sqlQuery(query) @@ -624,10 +656,12 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) { String query = String.format("select part, val from %s where part = cast(2.0 as %s)", newTable, decimalType); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); - assertThat(plan, not(containsString("Filter"))); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .exclude("Filter") + .match(); client.testBuilder() .sqlQuery(query) @@ -654,9 +688,11 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) { String query = String.format("select part, val from %s where val = cast(20.0 as %s)", newTable, decimalType); - String plan = client.queryBuilder().sql(query).explainText(); - assertThat(plan, containsString("numRowGroups=1")); - assertThat(plan, containsString("usedMetadataFile=false")); + queryBuilder() + .sql(query) + .planMatcher() + .include("numRowGroups=1", "usedMetadataFile=false") + .match(); client.testBuilder() .sqlQuery(query) @@ -667,7 +703,6 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { } } - @Ignore("Statistics for DECIMAL that has all nulls is not available (PARQUET-1341). Requires upgrade to Parquet 1.11.0") @Test public void testDecimalPruningWithNullPartition() throws Exception { List<String> ctasQueries = new ArrayList<>(); @@ -692,13 +727,26 @@ public class TestPushDownAndPruningForDecimal extends ClusterTest { long actualRowCount = client.queryBuilder().sql(query).run().recordCount(); assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount); - PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"}); + + queryBuilder() + .sql(query) + .planMatcher() + .include("usedMetadataFile=false") + .exclude("Filter") + .match(); queryBuilder().sql(String.format("refresh table metadata %s", decimalPartitionTable)).run(); actualRowCount = client.queryBuilder().sql(query).run().recordCount(); assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount); - PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"}); + + + queryBuilder() + .sql(query) + .planMatcher() + .include("usedMetadataFile=true") + .exclude("Filter") + .match(); } finally { client.runSqlSilently(String.format("drop table if exists %s", decimalPartitionTable)); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java index bd93cd3..a8a9364 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java @@ -26,7 +26,6 @@ import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.ClusterTest; import org.apache.drill.test.QueryBuilder; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -196,7 +195,6 @@ public class TestPushDownAndPruningForVarchar extends ClusterTest { } } - @Ignore("Statistics for VARCHAR that has all nulls is not available (PARQUET-1341). Requires upgrade to Parquet 1.11.0.") @Test public void testNewFilesPruningWithNullPartition() throws Exception { String table = "dfs.`tmp`.`varchar_pruning_new_with_null_partition`"; diff --git a/pom.xml b/pom.xml index f26c218..359f059 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ <shaded.guava.version>23.0</shaded.guava.version> <guava.version>19.0</guava.version> <forkCount>2</forkCount> - <parquet.version>1.10.0</parquet.version> + <parquet.version>1.11.0</parquet.version> <!-- For development purposes to be able to use custom Calcite versions (e.g. not present in jitpack repository or from local repository) update this property to desired value (e.g. org.apache.calcite). @@ -1683,7 +1683,7 @@ <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-format</artifactId> - <version>2.5.0</version> + <version>2.8.0</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId>
