This is an automated email from the ASF dual-hosted git repository. sungwy pushed a commit to branch sy-pyiceberg-0.7.1 in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
commit 090061440dded34ad283c997832f4a4e6a2a17a1 Author: Fokko Driesprong <[email protected]> AuthorDate: Thu Aug 8 16:08:30 2024 +0200 Allow setting `write.parquet.row-group-limit` (#1016) * Allow setting `write.parquet.row-group-limit` And update the docs * Add test * Make ruff happy --------- Co-authored-by: Sung Yun <[email protected]> --- mkdocs/docs/configuration.md | 18 ++++++++++-------- pyiceberg/io/pyarrow.py | 4 ++-- pyiceberg/table/__init__.py | 2 +- tests/integration/test_reads.py | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 48 insertions(+), 11 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index ff374165..ae60de51 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -28,14 +28,16 @@ Iceberg tables support table properties to configure table behavior. ### Write options -| Key | Options | Default | Description | -| --------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- | -| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | -| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | -| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | -| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk | -| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | -| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit | +| Key | Options | Default | Description | +| -------------------------------------- | --------------------------------- | ------- | ------------------------------------------------------------------------------------------- | +| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | +| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | +| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group | +| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | +| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the approximate encoded size of data pages within a column chunk | +| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | +| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit | +| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. | ### Table behavior options diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f8e54081..33561da5 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2137,8 +2137,8 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) row_group_size = PropertyUtil.property_as_int( properties=table_metadata.properties, - property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, - default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, + property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, + default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, ) def write_parquet(task: WriteTask) -> DataFile: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a7a2dec2..6ea78679 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -173,7 +173,7 @@ class TableProperties: PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB PARQUET_ROW_GROUP_LIMIT = "write.parquet.row-group-limit" - PARQUET_ROW_GROUP_LIMIT_DEFAULT = 128 * 1024 * 1024 # 128 MB + PARQUET_ROW_GROUP_LIMIT_DEFAULT = 1048576 PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes" PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 078ec163..eb5cca87 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -46,6 +46,7 @@ from pyiceberg.table import Table from pyiceberg.types import ( BooleanType, IntegerType, + LongType, NestedField, StringType, TimestampType, @@ -666,6 +667,40 @@ def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None: @pytest.mark.integration +def test_configure_row_group_batch_size(session_catalog: Catalog) -> None: + from pyiceberg.table import TableProperties + + table_name = "default.test_small_row_groups" + try: + session_catalog.drop_table(table_name) + except NoSuchTableError: + pass # Just to make sure that the table doesn't exist + + tbl = session_catalog.create_table( + table_name, + Schema( + NestedField(1, "number", LongType()), + ), + properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"}, + ) + + # Write 10 row groups, that should end up as 10 batches + entries = 10 + tbl.append( + pa.Table.from_pylist( + [ + { + "number": number, + } + for number in range(entries) + ], + ) + ) + + batches = list(tbl.scan().to_arrow_batch_reader()) + assert len(batches) == entries + + @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) def test_empty_scan_ordered_str(catalog: Catalog) -> None: table_empty_scan_ordered_str = catalog.load_table("default.test_empty_scan_ordered_str")
