This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3d00780f5 Parquet: Add option to set page row count limit (#5345)
3d00780f5 is described below
commit 3d00780f5f508ac7a0f475fd0e1ddfd736000c5b
Author: Bryan Keller <[email protected]>
AuthorDate: Thu Jul 28 10:59:05 2022 -0700
Parquet: Add option to set page row count limit (#5345)
---
.../java/org/apache/iceberg/TableProperties.java | 4 ++++
docs/configuration.md | 1 +
.../java/org/apache/iceberg/parquet/Parquet.java | 25 ++++++++++++++++++++++
3 files changed, 30 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index a8ca36379..06960939c 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -131,6 +131,10 @@ public class TableProperties {
"write.delete.parquet.page-size-bytes";
public static final int PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024; // 1
MB
+ public static final String PARQUET_PAGE_ROW_LIMIT =
"write.parquet.page-row-limit";
+ public static final String DELETE_PARQUET_PAGE_ROW_LIMIT =
"write.delete.parquet.page-row-limit";
+ public static final int PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20_000;
+
public static final String PARQUET_DICT_SIZE_BYTES =
"write.parquet.dict-size-bytes";
public static final String DELETE_PARQUET_DICT_SIZE_BYTES =
"write.delete.parquet.dict-size-bytes";
diff --git a/docs/configuration.md b/docs/configuration.md
index eab539ac5..a82ae0c81 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -52,6 +52,7 @@ Iceberg tables support table properties to configure table
behavior, like the de
| write.delete.format.default | data file format | Default delete
file format for the table; parquet, avro, or orc |
| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group
size |
| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size
|
+| write.parquet.page-row-limit | 20000 | Parquet page row
limit |
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary
page size |
| write.parquet.compression-codec | gzip | Parquet
compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
| write.parquet.compression-level | null | Parquet
compression level |
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index ac8856b92..856c8089b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.parquet;
import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT;
import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
@@ -34,6 +35,8 @@ import static
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
import static
org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
+import static
org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
import static
org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
@@ -240,6 +243,7 @@ public class Parquet {
int rowGroupSize = context.rowGroupSize();
int pageSize = context.pageSize();
+ int pageRowLimit = context.pageRowLimit();
int dictionaryPageSize = context.dictionaryPageSize();
String compressionLevel = context.compressionLevel();
CompressionCodecName codec = context.codec();
@@ -281,6 +285,7 @@ public class Parquet {
ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
+ .withPageRowCountLimit(pageRowLimit)
.withDictionaryPageSize(dictionaryPageSize)
.withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
@@ -317,6 +322,7 @@ public class Parquet {
.withWriteMode(writeMode)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
+ .withPageRowCountLimit(pageRowLimit)
.withDictionaryPageSize(dictionaryPageSize);
for (Map.Entry<String, String> entry :
columnBloomFilterEnabled.entrySet()) {
@@ -332,6 +338,7 @@ public class Parquet {
private static class Context {
private final int rowGroupSize;
private final int pageSize;
+ private final int pageRowLimit;
private final int dictionaryPageSize;
private final CompressionCodecName codec;
private final String compressionLevel;
@@ -343,6 +350,7 @@ public class Parquet {
private Context(
int rowGroupSize,
int pageSize,
+ int pageRowLimit,
int dictionaryPageSize,
CompressionCodecName codec,
String compressionLevel,
@@ -352,6 +360,7 @@ public class Parquet {
Map<String, String> columnBloomFilterEnabled) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
+ this.pageRowLimit = pageRowLimit;
this.dictionaryPageSize = dictionaryPageSize;
this.codec = codec;
this.compressionLevel = compressionLevel;
@@ -372,6 +381,11 @@ public class Parquet {
config, PARQUET_PAGE_SIZE_BYTES,
PARQUET_PAGE_SIZE_BYTES_DEFAULT);
Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
+ int pageRowLimit =
+ PropertyUtil.propertyAsInt(
+ config, PARQUET_PAGE_ROW_LIMIT,
PARQUET_PAGE_ROW_LIMIT_DEFAULT);
+ Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit
must be > 0");
+
int dictionaryPageSize =
PropertyUtil.propertyAsInt(
config, PARQUET_DICT_SIZE_BYTES,
PARQUET_DICT_SIZE_BYTES_DEFAULT);
@@ -414,6 +428,7 @@ public class Parquet {
return new Context(
rowGroupSize,
pageSize,
+ pageRowLimit,
dictionaryPageSize,
codec,
compressionLevel,
@@ -437,6 +452,11 @@ public class Parquet {
config, DELETE_PARQUET_PAGE_SIZE_BYTES,
dataContext.pageSize());
Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
+ int pageRowLimit =
+ PropertyUtil.propertyAsInt(
+ config, DELETE_PARQUET_PAGE_ROW_LIMIT,
dataContext.pageRowLimit());
+ Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit
must be > 0");
+
int dictionaryPageSize =
PropertyUtil.propertyAsInt(
config, DELETE_PARQUET_DICT_SIZE_BYTES,
dataContext.dictionaryPageSize());
@@ -479,6 +499,7 @@ public class Parquet {
return new Context(
rowGroupSize,
pageSize,
+ pageRowLimit,
dictionaryPageSize,
codec,
compressionLevel,
@@ -504,6 +525,10 @@ public class Parquet {
return pageSize;
}
+ int pageRowLimit() {
+ return pageRowLimit;
+ }
+
int dictionaryPageSize() {
return dictionaryPageSize;
}