This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a7770e22f974b452806620fff8eccd04bdff1844 Author: JingsongLi <[email protected]> AuthorDate: Mon Apr 21 13:54:49 2025 +0800 [parquet] Apply Parquet 1.15 changes --- .../apache/parquet/hadoop/ParquetFileReader.java | 148 +++++++++++++-------- .../org/apache/parquet/hadoop/ParquetWriter.java | 33 ++++- .../filter2/columnindex/ColumnIndexFilter.java | 9 +- 3 files changed, 135 insertions(+), 55 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index bebdba7670..256eec1a1c 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -23,7 +23,6 @@ import org.apache.paimon.format.parquet.ParquetInputStream; import org.apache.paimon.fs.VectoredReadable; import org.apache.paimon.utils.RoaringBitmap32; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; @@ -79,11 +78,13 @@ import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.util.AutoCloseables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -246,7 +247,7 @@ public class ParquetFileReader implements Closeable { private ColumnChunkPageReadStore currentRowGroup = null; private DictionaryPageReader nextDictionaryReader = null; - private InternalFileDecryptor fileDecryptor; + private InternalFileDecryptor fileDecryptor = null; public ParquetFileReader( InputFile file, ParquetReadOptions options, @Nullable RoaringBitmap32 selection) @@ -285,6 +286,7 @@ public class ParquetFileReader implements Closeable { for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } + if (options.usePageChecksumVerification()) { this.crc = new CRC32(); this.crcAllocator = ReusingByteBufferAllocator.strict(options.getAllocator()); @@ -298,18 +300,6 @@ public class ParquetFileReader implements Closeable { return new ArrayList<>(Collections.nCopies(size, null)); } - private boolean checkRowIndexOffsetExists(List<BlockMetaData> blocks) { - for (BlockMetaData block : blocks) { - if (block.getRowIndexOffset() == -1) { - LOG.warn( - "Row index offset was not found in block metadata of file {}, skip applying filter in order to get the correct row position", - file.getPath()); - return false; - } - } - return true; - } - public ParquetMetadata getFooter() { if (footer == null) { try { @@ -364,35 +354,44 @@ public class ParquetFileReader implements Closeable { private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) throws IOException { FilterCompat.Filter recordFilter = options.getRecordFilter(); - if (checkRowIndexOffsetExists(blocks)) { - if (FilterCompat.isFilteringRequired(recordFilter)) { - // set up data filters based on configured levels - List<RowGroupFilter.FilterLevel> levels = new ArrayList<>(); - if (options.useStatsFilter()) { - levels.add(STATISTICS); - } + for (BlockMetaData block : blocks) { + if (block.getRowIndexOffset() == -1) { + LOG.warn( + "Row index offset was not found in block metadata of file {}, " + + "skip applying filter in order to get the correct row position", + file.getPath()); + return blocks; + } + } - if (options.useDictionaryFilter()) { - levels.add(DICTIONARY); - } + if (FilterCompat.isFilteringRequired(recordFilter)) { + // set up data filters based on configured levels + List<RowGroupFilter.FilterLevel> levels = new ArrayList<>(); - if (options.useBloomFilter()) { - levels.add(BLOOMFILTER); - } - blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); + if (options.useStatsFilter()) { + levels.add(STATISTICS); } - if (selection != null) { - blocks = - blocks.stream() - .filter( - it -> - selection.intersects( - it.getRowIndexOffset(), - it.getRowIndexOffset() + it.getRowCount())) - .collect(Collectors.toList()); + if (options.useDictionaryFilter()) { + levels.add(DICTIONARY); } + + if (options.useBloomFilter()) { + levels.add(BLOOMFILTER); + } + blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); + } + + if (selection != null) { + blocks = + blocks.stream() + .filter( + it -> + selection.intersects( + it.getRowIndexOffset(), + it.getRowIndexOffset() + it.getRowCount())) + .collect(Collectors.toList()); } return blocks; @@ -421,8 +420,8 @@ public class ParquetFileReader implements Closeable { * Reads all the columns requested from the row group at the specified block. * * @param blockIndex the index of the requested block - * @throws IOException if an error occurs while reading * @return the PageReadStore which can provide PageReaders for each column. + * @throws IOException if an error occurs while reading */ public PageReadStore readRowGroup(int blockIndex) throws IOException { return internalReadRowGroup(blockIndex); @@ -431,8 +430,8 @@ public class ParquetFileReader implements Closeable { /** * Reads all the columns requested from the row group at the current file position. * - * @throws IOException if an error occurs while reading * @return the PageReadStore which can provide PageReaders for each column. + * @throws IOException if an error occurs while reading */ public PageReadStore readNextRowGroup() throws IOException { ColumnChunkPageReadStore rowGroup = null; @@ -584,7 +583,8 @@ public class ParquetFileReader implements Closeable { */ private void readAllPartsVectoredOrNormal( List<ConsecutivePartList> allParts, ChunkListBuilder builder) throws IOException { - if (shouldUseVectoredIo()) { + + if (shouldUseVectoredIo(allParts)) { try { readVectored(allParts, builder); return; @@ -599,9 +599,35 @@ public class ParquetFileReader implements Closeable { } } - /** Should the read use vectored IO. */ - private boolean shouldUseVectoredIo() { - return f.in() instanceof VectoredReadable; + /** + * Should the read use vectored IO? + * + * <p>Use Paimon vectored io. + * + * @param allParts all parts to read. + * @return true or false. + */ + private boolean shouldUseVectoredIo(final List<ConsecutivePartList> allParts) { + return f.in() instanceof VectoredReadable && arePartsValidForVectoredIo(allParts); + } + + /** + * Validate the parts for vectored IO. Vectored IO doesn't support reading ranges of size + * greater than Integer.MAX_VALUE. + * + * @param allParts all parts to read. + * @return true or false. + */ + private boolean arePartsValidForVectoredIo(List<ConsecutivePartList> allParts) { + for (ConsecutivePartList consecutivePart : allParts) { + if (consecutivePart.length >= Integer.MAX_VALUE) { + LOG.debug( + "Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO", + consecutivePart.length); + return false; + } + } + return true; } /** @@ -621,7 +647,6 @@ public class ParquetFileReader implements Closeable { * @throws IllegalArgumentException arguments are invalid. * @throws UnsupportedOperationException if the filesystem does not support vectored IO. */ - @SuppressWarnings("checkstyle:JavadocParagraph") private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder builder) throws IOException { @@ -730,7 +755,6 @@ public class ParquetFileReader implements Closeable { } } } - // actually read all the chunks readAllPartsVectoredOrNormal(allParts, builder); rowGroup.setReleaser(builder.releaser); for (Chunk chunk : builder.build()) { @@ -802,6 +826,10 @@ public class ParquetFileReader implements Closeable { // update the current block and instantiate a dictionary reader for it ++currentBlock; + + if (nextDictionaryReader != null) { + nextDictionaryReader.close(); + } this.nextDictionaryReader = null; return true; @@ -977,12 +1005,25 @@ public class ParquetFileReader implements Closeable { } } - // Read Bloom filter data header. + // Seek to Bloom filter offset. f.seek(bloomFilterOffset); + + // Read Bloom filter length. + int bloomFilterLength = meta.getBloomFilterLength(); + + // If it is set, read Bloom filter header and bitset together. + // Otherwise, read Bloom filter header first and then bitset. + InputStream in = f; + if (bloomFilterLength > 0) { + byte[] headerAndBitSet = new byte[bloomFilterLength]; + f.readFully(headerAndBitSet); + in = new ByteArrayInputStream(headerAndBitSet); + } + BloomFilterHeader bloomFilterHeader; try { bloomFilterHeader = - Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD); + Util.readBloomFilterHeader(in, bloomFilterDecryptor, bloomFilterHeaderAAD); } catch (IOException e) { LOG.warn("read no bloom filter"); return null; @@ -1010,9 +1051,9 @@ public class ParquetFileReader implements Closeable { byte[] bitset; if (null == bloomFilterDecryptor) { bitset = new byte[numBytes]; - f.readFully(bitset); + in.read(bitset); } else { - bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD); + bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD); if (bitset.length != numBytes) { throw new ParquetCryptoRuntimeException( "Wrong length of decrypted bloom filter bitset"); @@ -1022,11 +1063,12 @@ public class ParquetFileReader implements Closeable { } /** + * Class should be considered private. + * * @param column the column chunk which the column index is to be returned for * @return the column index for the specified column chunk or {@code null} if there is no index * @throws IOException if any I/O error occurs during reading the file */ - @Private public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOException { IndexReference ref = column.getColumnIndexReference(); if (ref == null) { @@ -1056,11 +1098,12 @@ public class ParquetFileReader implements Closeable { } /** + * Class should be considered private. + * * @param column the column chunk which the offset index is to be returned for * @return the offset index for the specified column chunk or {@code null} if there is no index * @throws IOException if any I/O error occurs during reading the file */ - @Private public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws IOException { IndexReference ref = column.getOffsetIndexReference(); if (ref == null) { @@ -1095,6 +1138,7 @@ public class ParquetFileReader implements Closeable { f.close(); } } finally { + AutoCloseables.uncheckedClose(nextDictionaryReader, crcAllocator); options.getCodecFactory().release(); } } @@ -1200,7 +1244,7 @@ public class ParquetFileReader implements Closeable { } /** - * Read all the pages in a given column chunk. + * Read all of the pages in a given column chunk. * * @return the list of pages */ diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index c6a0eb6212..79a2a10695 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -30,6 +30,8 @@ import org.apache.parquet.schema.MessageType; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * Write records to a Parquet file. @@ -67,7 +69,6 @@ public class ParquetWriter<T> implements Closeable { int maxPaddingSize, ParquetProperties encodingProps) throws IOException { - WriteSupport.WriteContext writeContext = writeSupport.init(conf); MessageType schema = writeContext.getSchema(); @@ -86,12 +87,40 @@ public class ParquetWriter<T> implements Closeable { this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName); + + final Map<String, String> extraMetadata; + if (encodingProps.getExtraMetaData() == null + || encodingProps.getExtraMetaData().isEmpty()) { + extraMetadata = writeContext.getExtraMetaData(); + } else { + extraMetadata = new HashMap<>(writeContext.getExtraMetaData()); + + encodingProps + .getExtraMetaData() + .forEach( + (metadataKey, metadataValue) -> { + if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) { + throw new IllegalArgumentException( + "Cannot overwrite metadata key " + + OBJECT_MODEL_NAME_PROP + + ". Please use another key name."); + } + + if (extraMetadata.put(metadataKey, metadataValue) != null) { + throw new IllegalArgumentException( + "Duplicate metadata key " + + metadataKey + + ". Please use another key name."); + } + }); + } + this.writer = new InternalParquetRecordWriter<T>( fileWriter, writeSupport, schema, - writeContext.getExtraMetaData(), + extraMetadata, rowGroupSize, compressor, validating, diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index d8a06950fa..d640d379ba 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -28,6 +28,7 @@ import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Column; +import org.apache.parquet.filter2.predicate.Operators.Contains; import org.apache.parquet.filter2.predicate.Operators.Eq; import org.apache.parquet.filter2.predicate.Operators.Gt; import org.apache.parquet.filter2.predicate.Operators.GtEq; @@ -195,6 +196,12 @@ public class ColumnIndexFilter implements Visitor<RowRanges> { notIn.getColumn(), ci -> ci.visit(notIn), isNull ? RowRanges.EMPTY : allRows()); } + @Override + public <T extends Comparable<T>> RowRanges visit(Contains<T> contains) { + return contains.filter( + this, RowRanges::intersection, RowRanges::union, ranges -> allRows()); + } + @Override public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit( UserDefined<T, U> udp) { @@ -239,7 +246,7 @@ public class ColumnIndexFilter implements Visitor<RowRanges> { @Override public RowRanges visit(And and) { RowRanges leftResult = and.getLeft().accept(this); - if (leftResult.getRanges().size() == 0) { + if (leftResult.getRanges().isEmpty()) { return leftResult; }
