This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch parquet-1.16.x
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/parquet-1.16.x by this push:
new c0280254e GH-3358: Add Configurable Thrift Max Message Size for
Parquet Metadata Reading (#3362)
c0280254e is described below
commit c0280254eb1a50706a7569ed087316e072a3faed
Author: Chiran Ravani <[email protected]>
AuthorDate: Tue Dec 2 15:11:55 2025 +0100
GH-3358: Add Configurable Thrift Max Message Size for Parquet Metadata
Reading (#3362)
---
.../main/java/org/apache/parquet/format/Util.java | 72 +++++++++-
.../format/converter/ParquetMetadataConverter.java | 47 ++++++-
.../TestParquetFileReaderMaxMessageSize.java | 146 +++++++++++++++++++++
3 files changed, 255 insertions(+), 10 deletions(-)
diff --git
a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
index d7a4c330c..776fb4557 100644
---
a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
+++
b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
@@ -45,6 +45,7 @@ import
org.apache.parquet.format.event.TypedConsumer.I32Consumer;
import org.apache.parquet.format.event.TypedConsumer.I64Consumer;
import org.apache.parquet.format.event.TypedConsumer.StringConsumer;
import org.apache.thrift.TBase;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -59,6 +60,7 @@ import org.apache.thrift.transport.TTransportException;
public class Util {
private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream
to) throws IOException {
writeColumnIndex(columnIndex, to, null, null);
@@ -156,6 +158,15 @@ public class Util {
return read(from, new FileMetaData(), decryptor, AAD);
}
+ public static FileMetaData readFileMetaData(InputStream from, int
maxMessageSize) throws IOException {
+ return readFileMetaData(from, null, null, maxMessageSize);
+ }
+
+ public static FileMetaData readFileMetaData(
+ InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int
maxMessageSize) throws IOException {
+ return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize);
+ }
+
public static void writeColumnMetaData(
ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor
encryptor, byte[] AAD)
throws IOException {
@@ -190,6 +201,18 @@ public class Util {
return md;
}
+ public static FileMetaData readFileMetaData(
+ InputStream from, boolean skipRowGroups, BlockCipher.Decryptor
decryptor, byte[] AAD, int maxMessageSize)
+ throws IOException {
+ FileMetaData md = new FileMetaData();
+ if (skipRowGroups) {
+ readFileMetaData(from, new DefaultFileMetaDataConsumer(md),
skipRowGroups, decryptor, AAD, maxMessageSize);
+ } else {
+ read(from, md, decryptor, AAD, maxMessageSize);
+ }
+ return md;
+ }
+
public static void writeFileCryptoMetaData(
org.apache.parquet.format.FileCryptoMetaData cryptoMetadata,
OutputStream to) throws IOException {
write(cryptoMetadata, to, null, null);
@@ -293,6 +316,17 @@ public class Util {
BlockCipher.Decryptor decryptor,
byte[] AAD)
throws IOException {
+ readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD,
DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ public static void readFileMetaData(
+ final InputStream input,
+ final FileMetaDataConsumer consumer,
+ boolean skipRowGroups,
+ BlockCipher.Decryptor decryptor,
+ byte[] AAD,
+ int maxMessageSize)
+ throws IOException {
try {
DelegatingFieldConsumer eventConsumer = fieldConsumer()
.onField(VERSION, new I32Consumer() {
@@ -358,26 +392,54 @@ public class Util {
byte[] plainText = decryptor.decrypt(input, AAD);
from = new ByteArrayInputStream(plainText);
}
- new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
+ new EventBasedThriftReader(protocol(from,
maxMessageSize)).readStruct(eventConsumer);
} catch (TException e) {
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
}
}
private static TProtocol protocol(OutputStream to) throws
TTransportException {
- return protocol(new TIOStreamTransport(to));
+ return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE);
}
private static TProtocol protocol(InputStream from) throws
TTransportException {
- return protocol(new TIOStreamTransport(from));
+ return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ private static TProtocol protocol(InputStream from, int maxMessageSize)
throws TTransportException {
+ return protocol(new TIOStreamTransport(from), maxMessageSize);
}
- private static InterningProtocol protocol(TIOStreamTransport t) {
+ private static InterningProtocol protocol(TIOStreamTransport t, int
configuredMaxMessageSize)
+ throws TTransportException, NumberFormatException {
+ int maxMessageSize = configuredMaxMessageSize;
+ if (configuredMaxMessageSize == -1) {
+ // Set to default 100 MB
+ maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+ }
+ if (configuredMaxMessageSize <= 0) {
+ throw new NumberFormatException("Max message size must be positive: " +
configuredMaxMessageSize);
+ }
+
+ TConfiguration config = t.getConfiguration();
+ config.setMaxMessageSize(maxMessageSize);
+ /*
+ Reset known message size to 0 to force checking against the max message
size.
+ This is necessary when reusing the same transport for multiple
reads/writes,
+ as the known message size may be larger than the max message size.
+ */
+ t.updateKnownMessageSize(0);
return new InterningProtocol(new TCompactProtocol(t));
}
private static <T extends TBase<?, ?>> T read(
final InputStream input, T tbase, BlockCipher.Decryptor decryptor,
byte[] AAD) throws IOException {
+ return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ private static <T extends TBase<?, ?>> T read(
+ final InputStream input, T tbase, BlockCipher.Decryptor decryptor,
byte[] AAD, int maxMessageSize)
+ throws IOException {
final InputStream from;
if (null == decryptor) {
from = input;
@@ -387,7 +449,7 @@ public class Util {
}
try {
- tbase.read(protocol(from));
+ tbase.read(protocol(from, maxMessageSize));
return tbase;
} catch (TException e) {
throw new IOException("can not read " + tbase.getClass() + ": " +
e.getMessage(), e);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index d20ac7fae..a1a256329 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -147,6 +147,15 @@ public class ParquetMetadataConverter {
public static final MetadataFilter SKIP_ROW_GROUPS = new
SkipMetadataFilter();
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
+ /**
+ * Configuration property to control the Thrift max message size when
reading Parquet metadata.
+ * This is useful for files with very large metadata
+ * Default value is 100 MB.
+ */
+ public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT =
"parquet.thrift.string.size.limit";
+
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
+
private static final Logger LOG =
LoggerFactory.getLogger(ParquetMetadataConverter.class);
private static final LogicalTypeConverterVisitor
LOGICAL_TYPE_ANNOTATION_VISITOR =
new LogicalTypeConverterVisitor();
@@ -154,6 +163,7 @@ public class ParquetMetadataConverter {
new ConvertedTypeConverterVisitor();
private final int statisticsTruncateLength;
private final boolean useSignedStringMinMax;
+ private final ParquetReadOptions options;
public ParquetMetadataConverter() {
this(false);
@@ -173,7 +183,7 @@ public class ParquetMetadataConverter {
}
public ParquetMetadataConverter(ParquetReadOptions options) {
- this(options.useSignedStringMinMax());
+ this(options.useSignedStringMinMax(),
ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options);
}
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
@@ -181,11 +191,30 @@ public class ParquetMetadataConverter {
}
private ParquetMetadataConverter(boolean useSignedStringMinMax, int
statisticsTruncateLength) {
+ this(useSignedStringMinMax, statisticsTruncateLength, null);
+ }
+
+ private ParquetMetadataConverter(
+ boolean useSignedStringMinMax, int statisticsTruncateLength,
ParquetReadOptions options) {
if (statisticsTruncateLength <= 0) {
throw new IllegalArgumentException("Truncate length should be greater
than 0");
}
this.useSignedStringMinMax = useSignedStringMinMax;
this.statisticsTruncateLength = statisticsTruncateLength;
+ this.options = options;
+ }
+
+ /**
+ * Gets the configured max message size for Thrift deserialization.
+ * Reads from ParquetReadOptions configuration, or returns -1 if not
available.
+ *
+ * @return the max message size in bytes, or -1 to use the default
+ */
+ private int getMaxMessageSize() {
+ if (options != null && options.getConfiguration() != null) {
+ return
options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT,
DEFAULT_MAX_MESSAGE_SIZE);
+ }
+ return -1;
}
// NOTE: this cache is for memory savings, not cpu savings, and is used to
de-duplicate
@@ -1694,21 +1723,27 @@ public class ParquetMetadataConverter {
filter.accept(new
MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter)
throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from,
footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD,
maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter
filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, true,
footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, true, footerDecryptor,
encryptedFooterAAD, maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter
filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from,
footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD,
maxMessageSize);
// We must generate the map *before* filtering because it modifies
`fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap =
generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata =
filterFileMetaDataByStart(fileMetadata, filter);
@@ -1717,7 +1752,9 @@ public class ParquetMetadataConverter {
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter
filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from,
footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD,
maxMessageSize);
// We must generate the map *before* filtering because it modifies
`fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap =
generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata =
filterFileMetaDataByMidpoint(fileMetadata, filter);
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
new file mode 100644
index 000000000..f9f121b99
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestParquetFileReaderMaxMessageSize {
+
+ public static Path TEST_FILE;
+ public MessageType schema;
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
+ @Before
+ public void testSetup() throws IOException {
+
+ File testParquetFile = temp.newFile();
+ testParquetFile.delete();
+
+ TEST_FILE = new Path(testParquetFile.toURI());
+ // Create a file with many columns
+ StringBuilder schemaBuilder = new StringBuilder("message test_schema {");
+ for (int i = 0; i < 2000; i++) {
+ schemaBuilder.append("required int64 col_").append(i).append(";");
+ }
+ schemaBuilder.append("}");
+
+ schema = MessageTypeParser.parseMessageType(schemaBuilder.toString());
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ try (ParquetWriter<Group> writer =
ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf))
+ .withConf(conf)
+ .withType(schema)
+ .build()) {
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ Group group = factory.newGroup();
+ for (int col = 0; col < 2000; col++) {
+ group.append("col_" + col, 1L);
+ }
+ writer.write(group);
+ }
+ }
+
+ /**
+ * Test reading a file with many columns using custom max message size
+ */
+ @Test
+ public void testReadFileWithManyColumns() throws IOException {
+ Configuration readConf = new Configuration();
+ readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024);
+
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf),
options)) {
+
+ ParquetMetadata metadata = reader.getFooter();
+ assertNotNull(metadata);
+ assertEquals(schema, metadata.getFileMetaData().getSchema());
+ assertTrue(metadata.getBlocks().size() > 0);
+ }
+ }
+
+ /**
+ * Test that default configuration works for normal files
+ */
+ @Test
+ public void testReadNormalFileWithDefaultConfig() throws IOException {
+ // Read with default configuration (no custom max message size)
+ Configuration readConf = new Configuration();
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf),
options)) {
+
+ ParquetMetadata metadata = reader.getFooter();
+ assertNotNull(metadata);
+ assertEquals(1, metadata.getBlocks().get(0).getRowCount());
+ }
+ }
+
+ /**
+ * Test that insufficient max message size produces error
+ */
+ @Test
+ public void testInsufficientMaxMessageSizeError() throws IOException {
+ // Try to read with very small max message size
+ Configuration readConf = new Configuration();
+ readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte
+
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf),
options)) {
+ fail("Should have thrown Message size exceeds limit due to
MaxMessageSize");
+ } catch (IOException e) {
+ e.printStackTrace();
+ assertTrue(
+ "Error should mention TTransportException",
+ e.getMessage().contains("Message size exceeds limit")
+ || e.getCause().getMessage().contains("Message size exceeds
limit")
+ || e.getMessage().contains("MaxMessageSize reached")
+ || e.getCause().getMessage().contains("MaxMessageSize reached"));
+ }
+ }
+}