This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit a7770e22f974b452806620fff8eccd04bdff1844
Author: JingsongLi <[email protected]>
AuthorDate: Mon Apr 21 13:54:49 2025 +0800

    [parquet] Apply Parquet 1.15 changes
---
 .../apache/parquet/hadoop/ParquetFileReader.java   | 148 +++++++++++++--------
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  33 ++++-
 .../filter2/columnindex/ColumnIndexFilter.java     |   9 +-
 3 files changed, 135 insertions(+), 55 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index bebdba7670..256eec1a1c 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -23,7 +23,6 @@ import org.apache.paimon.format.parquet.ParquetInputStream;
 import org.apache.paimon.fs.VectoredReadable;
 import org.apache.paimon.utils.RoaringBitmap32;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Preconditions;
@@ -79,11 +78,13 @@ import org.apache.parquet.io.ParquetFileRange;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.util.AutoCloseables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -246,7 +247,7 @@ public class ParquetFileReader implements Closeable {
     private ColumnChunkPageReadStore currentRowGroup = null;
     private DictionaryPageReader nextDictionaryReader = null;
 
-    private InternalFileDecryptor fileDecryptor;
+    private InternalFileDecryptor fileDecryptor = null;
 
     public ParquetFileReader(
             InputFile file, ParquetReadOptions options, @Nullable 
RoaringBitmap32 selection)
@@ -285,6 +286,7 @@ public class ParquetFileReader implements Closeable {
         for (ColumnDescriptor col : 
footer.getFileMetaData().getSchema().getColumns()) {
             paths.put(ColumnPath.get(col.getPath()), col);
         }
+
         if (options.usePageChecksumVerification()) {
             this.crc = new CRC32();
             this.crcAllocator = 
ReusingByteBufferAllocator.strict(options.getAllocator());
@@ -298,18 +300,6 @@ public class ParquetFileReader implements Closeable {
         return new ArrayList<>(Collections.nCopies(size, null));
     }
 
-    private boolean checkRowIndexOffsetExists(List<BlockMetaData> blocks) {
-        for (BlockMetaData block : blocks) {
-            if (block.getRowIndexOffset() == -1) {
-                LOG.warn(
-                        "Row index offset was not found in block metadata of 
file {}, skip applying filter in order to get the correct row position",
-                        file.getPath());
-                return false;
-            }
-        }
-        return true;
-    }
-
     public ParquetMetadata getFooter() {
         if (footer == null) {
             try {
@@ -364,35 +354,44 @@ public class ParquetFileReader implements Closeable {
 
     private List<BlockMetaData> filterRowGroups(List<BlockMetaData> blocks) 
throws IOException {
         FilterCompat.Filter recordFilter = options.getRecordFilter();
-        if (checkRowIndexOffsetExists(blocks)) {
-            if (FilterCompat.isFilteringRequired(recordFilter)) {
-                // set up data filters based on configured levels
-                List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
 
-                if (options.useStatsFilter()) {
-                    levels.add(STATISTICS);
-                }
+        for (BlockMetaData block : blocks) {
+            if (block.getRowIndexOffset() == -1) {
+                LOG.warn(
+                        "Row index offset was not found in block metadata of 
file {}, "
+                                + "skip applying filter in order to get the 
correct row position",
+                        file.getPath());
+                return blocks;
+            }
+        }
 
-                if (options.useDictionaryFilter()) {
-                    levels.add(DICTIONARY);
-                }
+        if (FilterCompat.isFilteringRequired(recordFilter)) {
+            // set up data filters based on configured levels
+            List<RowGroupFilter.FilterLevel> levels = new ArrayList<>();
 
-                if (options.useBloomFilter()) {
-                    levels.add(BLOOMFILTER);
-                }
-                blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, 
blocks, this);
+            if (options.useStatsFilter()) {
+                levels.add(STATISTICS);
             }
 
-            if (selection != null) {
-                blocks =
-                        blocks.stream()
-                                .filter(
-                                        it ->
-                                                selection.intersects(
-                                                        it.getRowIndexOffset(),
-                                                        it.getRowIndexOffset() 
+ it.getRowCount()))
-                                .collect(Collectors.toList());
+            if (options.useDictionaryFilter()) {
+                levels.add(DICTIONARY);
             }
+
+            if (options.useBloomFilter()) {
+                levels.add(BLOOMFILTER);
+            }
+            blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, 
blocks, this);
+        }
+
+        if (selection != null) {
+            blocks =
+                    blocks.stream()
+                            .filter(
+                                    it ->
+                                            selection.intersects(
+                                                    it.getRowIndexOffset(),
+                                                    it.getRowIndexOffset() + 
it.getRowCount()))
+                            .collect(Collectors.toList());
         }
 
         return blocks;
@@ -421,8 +420,8 @@ public class ParquetFileReader implements Closeable {
      * Reads all the columns requested from the row group at the specified 
block.
      *
      * @param blockIndex the index of the requested block
-     * @throws IOException if an error occurs while reading
      * @return the PageReadStore which can provide PageReaders for each column.
+     * @throws IOException if an error occurs while reading
      */
     public PageReadStore readRowGroup(int blockIndex) throws IOException {
         return internalReadRowGroup(blockIndex);
@@ -431,8 +430,8 @@ public class ParquetFileReader implements Closeable {
     /**
      * Reads all the columns requested from the row group at the current file 
position.
      *
-     * @throws IOException if an error occurs while reading
      * @return the PageReadStore which can provide PageReaders for each column.
+     * @throws IOException if an error occurs while reading
      */
     public PageReadStore readNextRowGroup() throws IOException {
         ColumnChunkPageReadStore rowGroup = null;
@@ -584,7 +583,8 @@ public class ParquetFileReader implements Closeable {
      */
     private void readAllPartsVectoredOrNormal(
             List<ConsecutivePartList> allParts, ChunkListBuilder builder) 
throws IOException {
-        if (shouldUseVectoredIo()) {
+
+        if (shouldUseVectoredIo(allParts)) {
             try {
                 readVectored(allParts, builder);
                 return;
@@ -599,9 +599,35 @@ public class ParquetFileReader implements Closeable {
         }
     }
 
-    /** Should the read use vectored IO. */
-    private boolean shouldUseVectoredIo() {
-        return f.in() instanceof VectoredReadable;
+    /**
+     * Should the read use vectored IO?
+     *
+     * <p>Use Paimon vectored io.
+     *
+     * @param allParts all parts to read.
+     * @return true or false.
+     */
+    private boolean shouldUseVectoredIo(final List<ConsecutivePartList> 
allParts) {
+        return f.in() instanceof VectoredReadable && 
arePartsValidForVectoredIo(allParts);
+    }
+
+    /**
+     * Validate the parts for vectored IO. Vectored IO doesn't support reading 
ranges of size
+     * greater than Integer.MAX_VALUE.
+     *
+     * @param allParts all parts to read.
+     * @return true or false.
+     */
+    private boolean arePartsValidForVectoredIo(List<ConsecutivePartList> 
allParts) {
+        for (ConsecutivePartList consecutivePart : allParts) {
+            if (consecutivePart.length >= Integer.MAX_VALUE) {
+                LOG.debug(
+                        "Part length {} greater than Integer.MAX_VALUE thus 
disabling vectored IO",
+                        consecutivePart.length);
+                return false;
+            }
+        }
+        return true;
     }
 
     /**
@@ -621,7 +647,6 @@ public class ParquetFileReader implements Closeable {
      * @throws IllegalArgumentException arguments are invalid.
      * @throws UnsupportedOperationException if the filesystem does not 
support vectored IO.
      */
-    @SuppressWarnings("checkstyle:JavadocParagraph")
     private void readVectored(List<ConsecutivePartList> allParts, 
ChunkListBuilder builder)
             throws IOException {
 
@@ -730,7 +755,6 @@ public class ParquetFileReader implements Closeable {
                 }
             }
         }
-        // actually read all the chunks
         readAllPartsVectoredOrNormal(allParts, builder);
         rowGroup.setReleaser(builder.releaser);
         for (Chunk chunk : builder.build()) {
@@ -802,6 +826,10 @@ public class ParquetFileReader implements Closeable {
 
         // update the current block and instantiate a dictionary reader for it
         ++currentBlock;
+
+        if (nextDictionaryReader != null) {
+            nextDictionaryReader.close();
+        }
         this.nextDictionaryReader = null;
 
         return true;
@@ -977,12 +1005,25 @@ public class ParquetFileReader implements Closeable {
             }
         }
 
-        // Read Bloom filter data header.
+        // Seek to Bloom filter offset.
         f.seek(bloomFilterOffset);
+
+        // Read Bloom filter length.
+        int bloomFilterLength = meta.getBloomFilterLength();
+
+        // If it is set, read Bloom filter header and bitset together.
+        // Otherwise, read Bloom filter header first and then bitset.
+        InputStream in = f;
+        if (bloomFilterLength > 0) {
+            byte[] headerAndBitSet = new byte[bloomFilterLength];
+            f.readFully(headerAndBitSet);
+            in = new ByteArrayInputStream(headerAndBitSet);
+        }
+
         BloomFilterHeader bloomFilterHeader;
         try {
             bloomFilterHeader =
-                    Util.readBloomFilterHeader(f, bloomFilterDecryptor, 
bloomFilterHeaderAAD);
+                    Util.readBloomFilterHeader(in, bloomFilterDecryptor, 
bloomFilterHeaderAAD);
         } catch (IOException e) {
             LOG.warn("read no bloom filter");
             return null;
@@ -1010,9 +1051,9 @@ public class ParquetFileReader implements Closeable {
         byte[] bitset;
         if (null == bloomFilterDecryptor) {
             bitset = new byte[numBytes];
-            f.readFully(bitset);
+            in.read(bitset);
         } else {
-            bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
+            bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
             if (bitset.length != numBytes) {
                 throw new ParquetCryptoRuntimeException(
                         "Wrong length of decrypted bloom filter bitset");
@@ -1022,11 +1063,12 @@ public class ParquetFileReader implements Closeable {
     }
 
     /**
+     * Class should be considered private.
+     *
      * @param column the column chunk which the column index is to be returned 
for
      * @return the column index for the specified column chunk or {@code null} 
if there is no index
      * @throws IOException if any I/O error occurs during reading the file
      */
-    @Private
     public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws 
IOException {
         IndexReference ref = column.getColumnIndexReference();
         if (ref == null) {
@@ -1056,11 +1098,12 @@ public class ParquetFileReader implements Closeable {
     }
 
     /**
+     * Class should be considered private.
+     *
      * @param column the column chunk which the offset index is to be returned 
for
      * @return the offset index for the specified column chunk or {@code null} 
if there is no index
      * @throws IOException if any I/O error occurs during reading the file
      */
-    @Private
     public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws 
IOException {
         IndexReference ref = column.getOffsetIndexReference();
         if (ref == null) {
@@ -1095,6 +1138,7 @@ public class ParquetFileReader implements Closeable {
                 f.close();
             }
         } finally {
+            AutoCloseables.uncheckedClose(nextDictionaryReader, crcAllocator);
             options.getCodecFactory().release();
         }
     }
@@ -1200,7 +1244,7 @@ public class ParquetFileReader implements Closeable {
         }
 
         /**
-         * Read all the pages in a given column chunk.
+         * Read all of the pages in a given column chunk.
          *
          * @return the list of pages
          */
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index c6a0eb6212..79a2a10695 100644
--- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -30,6 +30,8 @@ import org.apache.parquet.schema.MessageType;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Write records to a Parquet file.
@@ -67,7 +69,6 @@ public class ParquetWriter<T> implements Closeable {
             int maxPaddingSize,
             ParquetProperties encodingProps)
             throws IOException {
-
         WriteSupport.WriteContext writeContext = writeSupport.init(conf);
         MessageType schema = writeContext.getSchema();
 
@@ -86,12 +87,40 @@ public class ParquetWriter<T> implements Closeable {
 
         this.codecFactory = new CodecFactory(conf, 
encodingProps.getPageSizeThreshold());
         CodecFactory.BytesCompressor compressor = 
codecFactory.getCompressor(compressionCodecName);
+
+        final Map<String, String> extraMetadata;
+        if (encodingProps.getExtraMetaData() == null
+                || encodingProps.getExtraMetaData().isEmpty()) {
+            extraMetadata = writeContext.getExtraMetaData();
+        } else {
+            extraMetadata = new HashMap<>(writeContext.getExtraMetaData());
+
+            encodingProps
+                    .getExtraMetaData()
+                    .forEach(
+                            (metadataKey, metadataValue) -> {
+                                if 
(metadataKey.equals(OBJECT_MODEL_NAME_PROP)) {
+                                    throw new IllegalArgumentException(
+                                            "Cannot overwrite metadata key "
+                                                    + OBJECT_MODEL_NAME_PROP
+                                                    + ". Please use another 
key name.");
+                                }
+
+                                if (extraMetadata.put(metadataKey, 
metadataValue) != null) {
+                                    throw new IllegalArgumentException(
+                                            "Duplicate metadata key "
+                                                    + metadataKey
+                                                    + ". Please use another 
key name.");
+                                }
+                            });
+        }
+
         this.writer =
                 new InternalParquetRecordWriter<T>(
                         fileWriter,
                         writeSupport,
                         schema,
-                        writeContext.getExtraMetaData(),
+                        extraMetadata,
                         rowGroupSize,
                         compressor,
                         validating,
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
index d8a06950fa..d640d379ba 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -28,6 +28,7 @@ import 
org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
 import org.apache.parquet.filter2.predicate.Operators;
 import org.apache.parquet.filter2.predicate.Operators.And;
 import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Contains;
 import org.apache.parquet.filter2.predicate.Operators.Eq;
 import org.apache.parquet.filter2.predicate.Operators.Gt;
 import org.apache.parquet.filter2.predicate.Operators.GtEq;
@@ -195,6 +196,12 @@ public class ColumnIndexFilter implements 
Visitor<RowRanges> {
                 notIn.getColumn(), ci -> ci.visit(notIn), isNull ? 
RowRanges.EMPTY : allRows());
     }
 
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(Contains<T> contains) {
+        return contains.filter(
+                this, RowRanges::intersection, RowRanges::union, ranges -> 
allRows());
+    }
+
     @Override
     public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> 
RowRanges visit(
             UserDefined<T, U> udp) {
@@ -239,7 +246,7 @@ public class ColumnIndexFilter implements 
Visitor<RowRanges> {
     @Override
     public RowRanges visit(And and) {
         RowRanges leftResult = and.getLeft().accept(this);
-        if (leftResult.getRanges().size() == 0) {
+        if (leftResult.getRanges().isEmpty()) {
             return leftResult;
         }
 

Reply via email to