This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new cc7522536 GH-3235: Row count limit for each row group (#3236)
cc7522536 is described below
commit cc7522536e80f1fc8cbff1d060da17ed3f01448a
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Jun 18 16:43:05 2025 +0800
GH-3235: Row count limit for each row group (#3236)
---
.../org/apache/parquet/column/ParquetProperties.java | 14 ++++++++++++++
parquet-hadoop/README.md | 6 +++++-
.../parquet/hadoop/InternalParquetRecordWriter.java | 17 ++++++++++++-----
.../apache/parquet/hadoop/ParquetOutputFormat.java | 14 ++++++++++++++
.../org/apache/parquet/hadoop/ParquetWriter.java | 11 +++++++++++
.../org/apache/parquet/hadoop/TestParquetWriter.java | 20 ++++++++++++++++----
6 files changed, 72 insertions(+), 10 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index cb5931581..d3dced53d 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -57,6 +57,7 @@ public class ParquetProperties {
public static final int DEFAULT_PAGE_VALUE_COUNT_THRESHOLD =
Integer.MAX_VALUE / 2;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH =
Integer.MAX_VALUE;
+ public static final int DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT =
Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
@@ -122,6 +123,7 @@ public class ParquetProperties {
private final ColumnProperty<Boolean> bloomFilterEnabled;
private final ColumnProperty<Boolean> adaptiveBloomFilterEnabled;
private final ColumnProperty<Integer> numBloomFilterCandidates;
+ private final int rowGroupRowCountLimit;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
@@ -153,6 +155,7 @@ public class ParquetProperties {
this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
this.adaptiveBloomFilterEnabled =
builder.adaptiveBloomFilterEnabled.build();
this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
+ this.rowGroupRowCountLimit = builder.rowGroupRowCountLimit;
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
@@ -302,6 +305,10 @@ public class ParquetProperties {
return estimateNextSizeCheck;
}
+ public int getRowGroupRowCountLimit() {
+ return rowGroupRowCountLimit;
+ }
+
public int getPageRowCountLimit() {
return pageRowCountLimit;
}
@@ -400,6 +407,7 @@ public class ParquetProperties {
private final ColumnProperty.Builder<Boolean> adaptiveBloomFilterEnabled;
private final ColumnProperty.Builder<Integer> numBloomFilterCandidates;
private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
+ private int rowGroupRowCountLimit = DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled =
DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private final ColumnProperty.Builder<ByteStreamSplitMode>
byteStreamSplitEnabled;
@@ -679,6 +687,12 @@ public class ParquetProperties {
return this;
}
+ public Builder withRowGroupRowCountLimit(int rowCount) {
+ Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for
row groups: %s", rowCount);
+ rowGroupRowCountLimit = rowCount;
+ return this;
+ }
+
public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for
pages: %s", rowCount);
pageRowCountLimit = rowCount;
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index b0350b881..4826d49e2 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -266,12 +266,16 @@ conf.set("parquet.bloom.filter.fpp#column.path", 0.02)
---
-
**Property:** `parquet.decrypt.off-heap.buffer.enabled`
**Description:** Whether to use direct buffers to decrypt encrypted files.
This should be set to
true if the reader is using a `DirectByteBufferAllocator`
**Default value:** `false`
+---
+
+**Property:** `parquet.block.row.count.limit`
+**Description:** The maximum number of rows per row group.
+**Default value:** `2147483647` (Integer.MAX_VALUE)
---
**Property:** `parquet.page.row.count.limit`
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 0cc05d6d7..f29628680 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -48,8 +48,8 @@ class InternalParquetRecordWriter<T> {
private final WriteSupport<T> writeSupport;
private final MessageType schema;
private final Map<String, String> extraMetaData;
- private final long rowGroupSize;
private long rowGroupSizeThreshold;
+ private final int rowGroupRecordCountThreshold;
private long nextRowGroupSize;
private final BytesInputCompressor compressor;
private final boolean validating;
@@ -91,8 +91,8 @@ class InternalParquetRecordWriter<T> {
this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport
cannot be null");
this.schema = schema;
this.extraMetaData = extraMetaData;
- this.rowGroupSize = rowGroupSize;
this.rowGroupSizeThreshold = rowGroupSize;
+ this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit();
this.nextRowGroupSize = rowGroupSizeThreshold;
this.compressor = compressor;
this.validating = validating;
@@ -166,9 +166,16 @@ class InternalParquetRecordWriter<T> {
}
private void checkBlockSizeReached() throws IOException {
- if (recordCount
- >= recordCountForNextMemCheck) { // checking the memory size is
relatively expensive, so let's not do it
- // for every record.
+ if (recordCount >= rowGroupRecordCountThreshold) {
+ LOG.debug("record count reaches threshold: flushing {} records to
disk.", recordCount);
+ flushRowGroupToStore();
+ initStore();
+ recordCountForNextMemCheck = min(
+ max(props.getMinRowCountForPageSizeCheck(), recordCount / 2),
+ props.getMaxRowCountForPageSizeCheck());
+ this.lastRowGroupEndPos = parquetFileWriter.getPos();
+ } else if (recordCount >= recordCountForNextMemCheck) {
+ // checking the memory size is relatively expensive, so let's not do it
for every record.
long memSize = columnStore.getBufferedSize();
long recordSize = memSize / recordCount;
// flush the row group if it is within ~2 records of the limit
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index f00fde9aa..403666868 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -154,6 +154,7 @@ public class ParquetOutputFormat<T> extends
FileOutputFormat<Void, T> {
public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
public static final String ADAPTIVE_BLOOM_FILTER_ENABLED =
"parquet.bloom.filter.adaptive.enabled";
public static final String BLOOM_FILTER_CANDIDATES_NUMBER =
"parquet.bloom.filter.candidates.number";
+ public static final String BLOCK_ROW_COUNT_LIMIT =
"parquet.block.row.count.limit";
public static final String PAGE_ROW_COUNT_LIMIT =
"parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED =
"parquet.page.write-checksum.enabled";
public static final String STATISTICS_ENABLED =
"parquet.column.statistics.enabled";
@@ -366,6 +367,18 @@ public class ParquetOutputFormat<T> extends
FileOutputFormat<Void, T> {
return conf.getInt(STATISTICS_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
}
+ public static void setBlockRowCountLimit(JobContext jobContext, int
rowCount) {
+ setBlockRowCountLimit(getConfiguration(jobContext), rowCount);
+ }
+
+ public static void setBlockRowCountLimit(Configuration conf, int rowCount) {
+ conf.setInt(BLOCK_ROW_COUNT_LIMIT, rowCount);
+ }
+
+ static int getBlockRowCountLimit(Configuration conf) {
+ return conf.getInt(BLOCK_ROW_COUNT_LIMIT,
ParquetProperties.DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT);
+ }
+
public static void setPageRowCountLimit(JobContext jobContext, int rowCount)
{
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
}
@@ -500,6 +513,7 @@ public class ParquetOutputFormat<T> extends
FileOutputFormat<Void, T> {
.withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
.withBloomFilterEnabled(getBloomFilterEnabled(conf))
.withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
+ .withRowGroupRowCountLimit(getBlockRowCountLimit(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.withStatisticsEnabled(getStatisticsEnabled(conf));
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index d4c4bc104..7789cad5c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -609,6 +609,17 @@ public class ParquetWriter<T> implements Closeable {
return self();
}
+ /**
+ * Sets the Parquet format row group row count limit used by the
constructed writer.
+ *
+ * @param rowCount limit for the number of rows stored in a row group
+ * @return this builder for method chaining
+ */
+ public SELF withRowGroupRowCountLimit(int rowCount) {
+ encodingPropsBuilder.withRowGroupRowCountLimit(rowCount);
+ return self();
+ }
+
/**
* Sets the Parquet format page row count limit used by the constructed
writer.
*
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 739aa85d2..2cd83624f 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -443,8 +443,17 @@ public class TestParquetWriter {
testParquetFileNumberOfBlocks(
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
+ new Configuration(),
1);
- testParquetFileNumberOfBlocks(1, 1, 3);
+ testParquetFileNumberOfBlocks(1, 1, new Configuration(), 3);
+
+ Configuration conf = new Configuration();
+ ParquetOutputFormat.setBlockRowCountLimit(conf, 1);
+ testParquetFileNumberOfBlocks(
+ ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK,
+ ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK,
+ conf,
+ 3);
}
@Test
@@ -506,7 +515,10 @@ public class TestParquetWriter {
}
private void testParquetFileNumberOfBlocks(
- int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int
expectedNumberOfBlocks)
+ int minRowCountForPageSizeCheck,
+ int maxRowCountForPageSizeCheck,
+ Configuration conf,
+ int expectedNumberOfBlocks)
throws IOException {
MessageType schema = Types.buildMessage()
.required(BINARY)
@@ -514,7 +526,6 @@ public class TestParquetWriter {
.named("str")
.named("msg");
- Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);
File file = temp.newFile();
@@ -523,7 +534,8 @@ public class TestParquetWriter {
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withAllocator(allocator)
.withConf(conf)
- // Set row group size to 1, to make sure we flush every time
+
.withRowGroupRowCountLimit(ParquetOutputFormat.getBlockRowCountLimit(conf))
+ // Set row group size to 1, to make sure we flush every time when
// minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is
exceeded
.withRowGroupSize(1)
.withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck)