This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new cc7522536 GH-3235: Row count limit for each row group (#3236)
cc7522536 is described below

commit cc7522536e80f1fc8cbff1d060da17ed3f01448a
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Jun 18 16:43:05 2025 +0800

    GH-3235: Row count limit for each row group (#3236)
---
 .../org/apache/parquet/column/ParquetProperties.java | 14 ++++++++++++++
 parquet-hadoop/README.md                             |  6 +++++-
 .../parquet/hadoop/InternalParquetRecordWriter.java  | 17 ++++++++++++-----
 .../apache/parquet/hadoop/ParquetOutputFormat.java   | 14 ++++++++++++++
 .../org/apache/parquet/hadoop/ParquetWriter.java     | 11 +++++++++++
 .../org/apache/parquet/hadoop/TestParquetWriter.java | 20 ++++++++++++++++----
 6 files changed, 72 insertions(+), 10 deletions(-)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index cb5931581..d3dced53d 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -57,6 +57,7 @@ public class ParquetProperties {
   public static final int DEFAULT_PAGE_VALUE_COUNT_THRESHOLD = 
Integer.MAX_VALUE / 2;
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
   public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = 
Integer.MAX_VALUE;
+  public static final int DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT = 
Integer.MAX_VALUE;
   public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
   public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
   public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
@@ -122,6 +123,7 @@ public class ParquetProperties {
   private final ColumnProperty<Boolean> bloomFilterEnabled;
   private final ColumnProperty<Boolean> adaptiveBloomFilterEnabled;
   private final ColumnProperty<Integer> numBloomFilterCandidates;
+  private final int rowGroupRowCountLimit;
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
   private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
@@ -153,6 +155,7 @@ public class ParquetProperties {
     this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
     this.adaptiveBloomFilterEnabled = 
builder.adaptiveBloomFilterEnabled.build();
     this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
+    this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
     this.pageRowCountLimit = builder.pageRowCountLimit;
     this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
     this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
@@ -302,6 +305,10 @@ public class ParquetProperties {
     return estimateNextSizeCheck;
   }
 
+  public int getRowGroupRowCountLimit() {
+    return rowGroupRowCountLimit;
+  }
+
   public int getPageRowCountLimit() {
     return pageRowCountLimit;
   }
@@ -400,6 +407,7 @@ public class ParquetProperties {
     private final ColumnProperty.Builder<Boolean> adaptiveBloomFilterEnabled;
     private final ColumnProperty.Builder<Integer> numBloomFilterCandidates;
     private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
+    private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = 
DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
     private final ColumnProperty.Builder<ByteStreamSplitMode> 
byteStreamSplitEnabled;
@@ -679,6 +687,12 @@ public class ParquetProperties {
       return this;
     }
 
+    public Builder withRowGroupRowCountLimit(int rowCount) {
+      Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for 
row groups: %s", rowCount);
+      rowGroupRowCountLimit = rowCount;
+      return this;
+    }
+
     public Builder withPageRowCountLimit(int rowCount) {
       Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for 
pages: %s", rowCount);
       pageRowCountLimit = rowCount;
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index b0350b881..4826d49e2 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -266,12 +266,16 @@ conf.set("parquet.bloom.filter.fpp#column.path", 0.02)
 
 ---
 
-
 **Property:** `parquet.decrypt.off-heap.buffer.enabled`  
 **Description:** Whether to use direct buffers to decrypt encrypted files. 
This should be set to 
 true if the reader is using a `DirectByteBufferAllocator`
 **Default value:** `false`
 
+---
+
+**Property:** `parquet.block.row.count.limit`
+**Description:** The maximum number of rows per row group.
+**Default value:** `2147483647` (Integer.MAX_VALUE)
 
 ---
 **Property:** `parquet.page.row.count.limit`  
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 0cc05d6d7..f29628680 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -48,8 +48,8 @@ class InternalParquetRecordWriter<T> {
   private final WriteSupport<T> writeSupport;
   private final MessageType schema;
   private final Map<String, String> extraMetaData;
-  private final long rowGroupSize;
   private long rowGroupSizeThreshold;
+  private final int rowGroupRecordCountThreshold;
   private long nextRowGroupSize;
   private final BytesInputCompressor compressor;
   private final boolean validating;
@@ -91,8 +91,8 @@ class InternalParquetRecordWriter<T> {
     this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport 
cannot be null");
     this.schema = schema;
     this.extraMetaData = extraMetaData;
-    this.rowGroupSize = rowGroupSize;
     this.rowGroupSizeThreshold = rowGroupSize;
+    this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
     this.nextRowGroupSize = rowGroupSizeThreshold;
     this.compressor = compressor;
     this.validating = validating;
@@ -166,9 +166,16 @@ class InternalParquetRecordWriter<T> {
   }
 
   private void checkBlockSizeReached() throws IOException {
-    if (recordCount
-        >= recordCountForNextMemCheck) { // checking the memory size is 
relatively expensive, so let's not do it
-      // for every record.
+    if (recordCount >= rowGroupRecordCountThreshold) {
+      LOG.debug("record count reaches threshold: flushing {} records to 
disk.", recordCount);
+      flushRowGroupToStore();
+      initStore();
+      recordCountForNextMemCheck = min(
+          max(props.getMinRowCountForPageSizeCheck(), recordCount / 2),
+          props.getMaxRowCountForPageSizeCheck());
+      this.lastRowGroupEndPos = parquetFileWriter.getPos();
+    } else if (recordCount >= recordCountForNextMemCheck) {
+      // checking the memory size is relatively expensive, so let's not do it 
for every record.
       long memSize = columnStore.getBufferedSize();
       long recordSize = memSize / recordCount;
       // flush the row group if it is within ~2 records of the limit
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index f00fde9aa..403666868 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -154,6 +154,7 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
   public static final String ADAPTIVE_BLOOM_FILTER_ENABLED = 
"parquet.bloom.filter.adaptive.enabled";
   public static final String BLOOM_FILTER_CANDIDATES_NUMBER = 
"parquet.bloom.filter.candidates.number";
+  public static final String BLOCK_ROW_COUNT_LIMIT = 
"parquet.block.row.count.limit";
   public static final String PAGE_ROW_COUNT_LIMIT = 
"parquet.page.row.count.limit";
   public static final String PAGE_WRITE_CHECKSUM_ENABLED = 
"parquet.page.write-checksum.enabled";
   public static final String STATISTICS_ENABLED = 
"parquet.column.statistics.enabled";
@@ -366,6 +367,18 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
     return conf.getInt(STATISTICS_TRUNCATE_LENGTH, 
ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
   }
 
+  public static void setBlockRowCountLimit(JobContext jobContext, int 
rowCount) {
+    setBlockRowCountLimit(getConfiguration(jobContext), rowCount);
+  }
+
+  public static void setBlockRowCountLimit(Configuration conf, int rowCount) {
+    conf.setInt(BLOCK_ROW_COUNT_LIMIT, rowCount);
+  }
+
+  static int getBlockRowCountLimit(Configuration conf) {
+    return conf.getInt(BLOCK_ROW_COUNT_LIMIT, 
ParquetProperties.DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT);
+  }
+
   public static void setPageRowCountLimit(JobContext jobContext, int rowCount) 
{
     setPageRowCountLimit(getConfiguration(jobContext), rowCount);
   }
@@ -500,6 +513,7 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
         .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
         .withBloomFilterEnabled(getBloomFilterEnabled(conf))
         .withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
+        .withRowGroupRowCountLimit(getBlockRowCountLimit(conf))
         .withPageRowCountLimit(getPageRowCountLimit(conf))
         .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
         .withStatisticsEnabled(getStatisticsEnabled(conf));
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index d4c4bc104..7789cad5c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -609,6 +609,17 @@ public class ParquetWriter<T> implements Closeable {
       return self();
     }
 
+    /**
+     * Sets the Parquet format row group row count limit used by the 
constructed writer.
+     *
+     * @param rowCount limit for the number of rows stored in a row group
+     * @return this builder for method chaining
+     */
+    public SELF withRowGroupRowCountLimit(int rowCount) {
+      encodingPropsBuilder.withRowGroupRowCountLimit(rowCount);
+      return self();
+    }
+
     /**
      * Sets the Parquet format page row count limit used by the constructed 
writer.
      *
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 739aa85d2..2cd83624f 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -443,8 +443,17 @@ public class TestParquetWriter {
     testParquetFileNumberOfBlocks(
         ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
         ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
+        new Configuration(),
         1);
-    testParquetFileNumberOfBlocks(1, 1, 3);
+    testParquetFileNumberOfBlocks(1, 1, new Configuration(), 3);
+
+    Configuration conf = new Configuration();
+    ParquetOutputFormat.setBlockRowCountLimit(conf, 1);
+    testParquetFileNumberOfBlocks(
+        ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+        ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
+        conf,
+        3);
   }
 
   @Test
@@ -506,7 +515,10 @@ public class TestParquetWriter {
   }
 
   private void testParquetFileNumberOfBlocks(
-      int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int 
expectedNumberOfBlocks)
+      int minRowCountForPageSizeCheck,
+      int maxRowCountForPageSizeCheck,
+      Configuration conf,
+      int expectedNumberOfBlocks)
       throws IOException {
     MessageType schema = Types.buildMessage()
         .required(BINARY)
@@ -514,7 +526,6 @@ public class TestParquetWriter {
         .named("str")
         .named("msg");
 
-    Configuration conf = new Configuration();
     GroupWriteSupport.setSchema(schema, conf);
 
     File file = temp.newFile();
@@ -523,7 +534,8 @@ public class TestParquetWriter {
     try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
         .withAllocator(allocator)
         .withConf(conf)
-        // Set row group size to 1, to make sure we flush every time
+        
.withRowGroupRowCountLimit(ParquetOutputFormat.getBlockRowCountLimit(conf))
+        // Set row group size to 1, to make sure we flush every time when
         // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is 
exceeded
         .withRowGroupSize(1)
         .withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck)

Reply via email to