This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 910bcc4ed PARQUET-2342: Fix writing corrupted parquet file by avoiding
overflow on page value coun (#1135)
910bcc4ed is described below
commit 910bcc4edc2d707670e02e9ceadd98dacd9f08d2
Author: Zamil Majdy <[email protected]>
AuthorDate: Thu Aug 31 09:16:10 2023 +0200
PARQUET-2342: Fix writing corrupted parquet file by avoiding overflow on
page value coun (#1135)
Parquet writer only checks the number of rows and the page size to
decide whether it needs to fit content written on a single page.
In the case of a composite column (ex: array/map) with many nulls,
it is possible to create 2billions+ values under the default page
size & row-count threshold (1MB, 20000 rows).
This change is adding an Integer limit threshold to avoid value count
overflow to happen within a single page.
---
.../apache/parquet/column/ParquetProperties.java | 15 +++++++++++++
.../parquet/column/impl/ColumnWriteStoreBase.java | 4 +++-
.../parquet/column/impl/ColumnWriterBase.java | 4 ++++
.../apache/parquet/column/mem/TestMemColumn.java | 26 ++++++++++++++++++++--
parquet-hadoop/README.md | 6 +++++
.../apache/parquet/hadoop/ParquetOutputFormat.java | 7 ++++++
6 files changed, 59 insertions(+), 3 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index dda58736f..cab7a9687 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -53,6 +53,7 @@ public class ParquetProperties {
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK =
true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+ public static final int DEFAULT_PAGE_VALUE_COUNT_THRESHOLD =
Integer.MAX_VALUE / 2;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH =
Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
@@ -91,6 +92,7 @@ public class ParquetProperties {
private final int initialSlabSize;
private final int pageSizeThreshold;
+ private final int pageValueCountThreshold;
private final int dictionaryPageSizeThreshold;
private final WriterVersion writerVersion;
private final ColumnProperty<Boolean> dictionaryEnabled;
@@ -115,6 +117,7 @@ public class ParquetProperties {
private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
+ this.pageValueCountThreshold = builder.pageValueCountThreshold;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
this.dictionaryPageSizeThreshold = builder.dictPageSize;
@@ -177,6 +180,10 @@ public class ParquetProperties {
return pageSizeThreshold;
}
+ public int getPageValueCountThreshold() {
+ return pageValueCountThreshold;
+ }
+
public int getInitialSlabSize() {
return initialSlabSize;
}
@@ -323,6 +330,7 @@ public class ParquetProperties {
private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
private int minRowCountForPageSizeCheck =
DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
private int maxRowCountForPageSizeCheck =
DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
+ private int pageValueCountThreshold = DEFAULT_PAGE_VALUE_COUNT_THRESHOLD;
private boolean estimateNextSizeCheck =
DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory =
DEFAULT_VALUES_WRITER_FACTORY;
@@ -447,6 +455,13 @@ public class ParquetProperties {
return this;
}
+ public Builder withPageValueCountThreshold(int value) {
+ Preconditions.checkArgument(value > 0,
+ "Invalid page value count threshold (negative): %s", value);
+ this.pageValueCountThreshold = value;
+ return this;
+ }
+
// Do not attempt to predict next size check. Prevents issues with rows
that vary significantly in size.
public Builder estimateRowCountForPageSizeCheck(boolean
estimateNextSizeCheck) {
this.estimateNextSizeCheck = estimateNextSizeCheck;
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index 8cfdace8d..e0503befc 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -231,7 +231,9 @@ abstract class ColumnWriteStoreBase implements
ColumnWriteStore {
long usedMem = writer.getCurrentPageBufferedSize();
long rows = rowCount - writer.getRowsWrittenSoFar();
long remainingMem = props.getPageSizeThreshold() - usedMem;
- if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) {
+ if (remainingMem <= thresholdTolerance ||
+ rows >= pageRowCountLimit ||
+ writer.getValueCount() >= props.getPageValueCountThreshold()) {
writer.writePage();
remainingMem = props.getPageSizeThreshold();
} else {
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 8e11676b5..f0ecb71cc 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -380,6 +380,10 @@ abstract class ColumnWriterBase implements ColumnWriter {
return this.rowsWrittenSoFar;
}
+ int getValueCount() {
+ return this.valueCount;
+ }
+
/**
* Writes the current data to a new page in the page store
*/
diff --git
a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index 3b750a0a1..4af185e50 100644
---
a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++
b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -105,6 +105,30 @@ public class TestMemColumn {
}
}
+ @Test
+ public void testMemColumnBinaryExceedIntMaxValue() throws Exception {
+ MessageType mt = MessageTypeParser.parseMessageType("message msg {
required group v (LIST) { repeated group list { optional binary element; } }
}");
+ String[] col = new String[]{"v", "list", "element"};
+ MemPageStore memPageStore = new MemPageStore(100);
+
+ ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+ ColumnDescriptor path = mt.getColumnDescription(col);
+ ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
+
+ int numRows = 20000;
+ int numEntries = 110000;
+ for (int row=0; row < numRows; row++) {
+ columnWriter.writeNull(0, 1);
+ for (int i=1; i < numEntries; i++) columnWriter.writeNull( 1, 1);
+ memColumnsStore.endRecord();
+ }
+ memColumnsStore.flush();
+
+ ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
+ assertEquals("parquet page value-count should fit on the signed-int range",
+ columnReader.getTotalValueCount(), (long) numRows * numEntries);
+ }
+
@Test
public void testMemColumnSeveralPages() throws Exception {
MessageType mt = MessageTypeParser.parseMessageType("message msg {
required group foo { required int64 bar; } }");
@@ -180,7 +204,6 @@ public class TestMemColumn {
.requiredList().requiredElement(BINARY).named("binary_col")
.requiredList().requiredElement(INT32).named("int32_col")
.named("msg");
- System.out.println(schema);
MemPageStore memPageStore = new MemPageStore(123);
// Using V2 pages so we have rowCount info
@@ -239,7 +262,6 @@ public class TestMemColumn {
private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore
memPageStore) {
return new ColumnWriteStoreV1(memPageStore,
ParquetProperties.builder()
- .withPageSize(2048)
.withDictionaryEncoding(false)
.build());
}
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index c27c5f2fc..6f2373b9e 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -175,6 +175,12 @@ If the frequency is low, the performance will be better.
---
+**Property:** `parquet.page.value.count.threshold`
+**Description:** The value count threshold within a Parquet page used on each
page check.
+**Default value:** `Integer.MAX_VALUE / 2`
+
+---
+
**Property:** `parquet.page.size.check.estimate`
**Description:** If it is true, the column writer estimates the size of the
next page.
It prevents issues with rows that vary significantly in size.
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index fe718c063..dc23802cf 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -146,6 +146,7 @@ public class ParquetOutputFormat<T> extends
FileOutputFormat<Void, T> {
public static final String MAX_PADDING_BYTES =
"parquet.writer.max-padding";
public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK =
"parquet.page.size.row.check.min";
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK =
"parquet.page.size.row.check.max";
+ public static final String PAGE_VALUE_COUNT_THRESHOLD =
"parquet.page.value.count.threshold";
public static final String ESTIMATE_PAGE_SIZE_CHECK =
"parquet.page.size.check.estimate";
public static final String COLUMN_INDEX_TRUNCATE_LENGTH =
"parquet.columnindex.truncate.length";
public static final String STATISTICS_TRUNCATE_LENGTH =
"parquet.statistics.truncate.length";
@@ -278,6 +279,11 @@ public class ParquetOutputFormat<T> extends
FileOutputFormat<Void, T> {
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
}
+ public static int getValueCountThreshold(Configuration configuration) {
+ return configuration.getInt(PAGE_VALUE_COUNT_THRESHOLD,
+ ParquetProperties.DEFAULT_PAGE_VALUE_COUNT_THRESHOLD);
+ }
+
public static boolean getEstimatePageSizeCheck(Configuration configuration) {
return configuration.getBoolean(ESTIMATE_PAGE_SIZE_CHECK,
ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK);
@@ -456,6 +462,7 @@ public class ParquetOutputFormat<T> extends
FileOutputFormat<Void, T> {
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
+ .withPageValueCountThreshold(getValueCountThreshold(conf))
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
.withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
.withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))