This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 910bcc4ed PARQUET-2342: Fix writing corrupted parquet file by avoiding 
overflow on page value coun (#1135)
910bcc4ed is described below

commit 910bcc4edc2d707670e02e9ceadd98dacd9f08d2
Author: Zamil Majdy <[email protected]>
AuthorDate: Thu Aug 31 09:16:10 2023 +0200

    PARQUET-2342: Fix writing corrupted parquet file by avoiding overflow on 
page value coun (#1135)
    
    Parquet writer only checks the number of rows and the page size to
    decide whether it needs to fit content written on a single page.
    In the case of a composite column (ex: array/map) with many nulls,
    it is possible to create 2billions+ values under the default page
    size & row-count threshold (1MB, 20000 rows).
    This change is adding an Integer limit threshold to avoid value count
    overflow to happen within a single page.
---
 .../apache/parquet/column/ParquetProperties.java   | 15 +++++++++++++
 .../parquet/column/impl/ColumnWriteStoreBase.java  |  4 +++-
 .../parquet/column/impl/ColumnWriterBase.java      |  4 ++++
 .../apache/parquet/column/mem/TestMemColumn.java   | 26 ++++++++++++++++++++--
 parquet-hadoop/README.md                           |  6 +++++
 .../apache/parquet/hadoop/ParquetOutputFormat.java |  7 ++++++
 6 files changed, 59 insertions(+), 3 deletions(-)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index dda58736f..cab7a9687 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -53,6 +53,7 @@ public class ParquetProperties {
   public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = 
true;
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+  public static final int DEFAULT_PAGE_VALUE_COUNT_THRESHOLD = 
Integer.MAX_VALUE / 2;
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
   public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = 
Integer.MAX_VALUE;
   public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
@@ -91,6 +92,7 @@ public class ParquetProperties {
 
   private final int initialSlabSize;
   private final int pageSizeThreshold;
+  private final int pageValueCountThreshold;
   private final int dictionaryPageSizeThreshold;
   private final WriterVersion writerVersion;
   private final ColumnProperty<Boolean> dictionaryEnabled;
@@ -115,6 +117,7 @@ public class ParquetProperties {
 
   private ParquetProperties(Builder builder) {
     this.pageSizeThreshold = builder.pageSize;
+    this.pageValueCountThreshold = builder.pageValueCountThreshold;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
     this.dictionaryPageSizeThreshold = builder.dictPageSize;
@@ -177,6 +180,10 @@ public class ParquetProperties {
     return pageSizeThreshold;
   }
 
+  public int getPageValueCountThreshold() {
+    return pageValueCountThreshold;
+  }
+
   public int getInitialSlabSize() {
     return initialSlabSize;
   }
@@ -323,6 +330,7 @@ public class ParquetProperties {
     private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
     private int minRowCountForPageSizeCheck = 
DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
     private int maxRowCountForPageSizeCheck = 
DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
+    private int pageValueCountThreshold = DEFAULT_PAGE_VALUE_COUNT_THRESHOLD;
     private boolean estimateNextSizeCheck = 
DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     private ValuesWriterFactory valuesWriterFactory = 
DEFAULT_VALUES_WRITER_FACTORY;
@@ -447,6 +455,13 @@ public class ParquetProperties {
       return this;
     }
 
+    public Builder withPageValueCountThreshold(int value) {
+      Preconditions.checkArgument(value > 0,
+          "Invalid page value count threshold (negative): %s", value);
+      this.pageValueCountThreshold = value;
+      return this;
+    }
+
     // Do not attempt to predict next size check.  Prevents issues with rows 
that vary significantly in size.
     public Builder estimateRowCountForPageSizeCheck(boolean 
estimateNextSizeCheck) {
       this.estimateNextSizeCheck = estimateNextSizeCheck;
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index 8cfdace8d..e0503befc 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -231,7 +231,9 @@ abstract class ColumnWriteStoreBase implements 
ColumnWriteStore {
       long usedMem = writer.getCurrentPageBufferedSize();
       long rows = rowCount - writer.getRowsWrittenSoFar();
       long remainingMem = props.getPageSizeThreshold() - usedMem;
-      if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) {
+      if (remainingMem <= thresholdTolerance ||
+          rows >= pageRowCountLimit ||
+          writer.getValueCount() >= props.getPageValueCountThreshold()) {
         writer.writePage();
         remainingMem = props.getPageSizeThreshold();
       } else {
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 8e11676b5..f0ecb71cc 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -380,6 +380,10 @@ abstract class ColumnWriterBase implements ColumnWriter {
     return this.rowsWrittenSoFar;
   }
 
+  int getValueCount() {
+    return this.valueCount;
+  }
+
   /**
    * Writes the current data to a new page in the page store
    */
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java 
b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index 3b750a0a1..4af185e50 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -105,6 +105,30 @@ public class TestMemColumn {
     }
   }
 
+  @Test
+  public void testMemColumnBinaryExceedIntMaxValue() throws Exception {
+    MessageType mt = MessageTypeParser.parseMessageType("message msg { 
required group v (LIST) { repeated group list { optional binary element; } } 
}");
+    String[] col = new String[]{"v", "list", "element"};
+    MemPageStore memPageStore = new MemPageStore(100);
+
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnDescriptor path = mt.getColumnDescription(col);
+    ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
+
+    int numRows = 20000;
+    int numEntries = 110000;
+    for (int row=0; row < numRows; row++) {
+      columnWriter.writeNull(0, 1);
+      for (int i=1; i < numEntries; i++) columnWriter.writeNull( 1, 1);
+      memColumnsStore.endRecord();
+    }
+    memColumnsStore.flush();
+
+    ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
+    assertEquals("parquet page value-count should fit on the signed-int range",
+      columnReader.getTotalValueCount(), (long) numRows * numEntries);
+  }
+
   @Test
   public void testMemColumnSeveralPages() throws Exception {
     MessageType mt = MessageTypeParser.parseMessageType("message msg { 
required group foo { required int64 bar; } }");
@@ -180,7 +204,6 @@ public class TestMemColumn {
         .requiredList().requiredElement(BINARY).named("binary_col")
         .requiredList().requiredElement(INT32).named("int32_col")
         .named("msg");
-    System.out.println(schema);
     MemPageStore memPageStore = new MemPageStore(123);
 
     // Using V2 pages so we have rowCount info
@@ -239,7 +262,6 @@ public class TestMemColumn {
   private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore 
memPageStore) {
     return new ColumnWriteStoreV1(memPageStore,
         ParquetProperties.builder()
-            .withPageSize(2048)
             .withDictionaryEncoding(false)
             .build());
   }
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index c27c5f2fc..6f2373b9e 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -175,6 +175,12 @@ If the frequency is low, the performance will be better.
 
 ---
 
+**Property:** `parquet.page.value.count.threshold`  
+**Description:** The value count threshold within a Parquet page used on each 
page check.
+**Default value:** `Integer.MAX_VALUE / 2`
+
+---
+
 **Property:** `parquet.page.size.check.estimate`  
 **Description:** If it is true, the column writer estimates the size of the 
next page.  
 It prevents issues with rows that vary significantly in size.  
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index fe718c063..dc23802cf 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -146,6 +146,7 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
   public static final String MAX_PADDING_BYTES    = 
"parquet.writer.max-padding";
   public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = 
"parquet.page.size.row.check.min";
   public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = 
"parquet.page.size.row.check.max";
+  public static final String PAGE_VALUE_COUNT_THRESHOLD = 
"parquet.page.value.count.threshold";
   public static final String ESTIMATE_PAGE_SIZE_CHECK = 
"parquet.page.size.check.estimate";
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = 
"parquet.columnindex.truncate.length";
   public static final String STATISTICS_TRUNCATE_LENGTH = 
"parquet.statistics.truncate.length";
@@ -278,6 +279,11 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
         ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
   }
 
+  public static int getValueCountThreshold(Configuration configuration) {
+    return configuration.getInt(PAGE_VALUE_COUNT_THRESHOLD,
+        ParquetProperties.DEFAULT_PAGE_VALUE_COUNT_THRESHOLD);
+  }
+
   public static boolean getEstimatePageSizeCheck(Configuration configuration) {
     return configuration.getBoolean(ESTIMATE_PAGE_SIZE_CHECK,
         ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK);
@@ -456,6 +462,7 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
         .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
+        .withPageValueCountThreshold(getValueCountThreshold(conf))
         .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
         .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
         .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))

Reply via email to