This is an automated email from the ASF dual-hosted git repository.

sungwy pushed a commit to branch sy-pyiceberg-0.7.1
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git

commit 090061440dded34ad283c997832f4a4e6a2a17a1
Author: Fokko Driesprong <[email protected]>
AuthorDate: Thu Aug 8 16:08:30 2024 +0200

    Allow setting `write.parquet.row-group-limit` (#1016)
    
    * Allow setting `write.parquet.row-group-limit`
    
    And update the docs
    
    * Add test
    
    * Make ruff happy
    
    ---------
    
    Co-authored-by: Sung Yun <[email protected]>
---
 mkdocs/docs/configuration.md    | 18 ++++++++++--------
 pyiceberg/io/pyarrow.py         |  4 ++--
 pyiceberg/table/__init__.py     |  2 +-
 tests/integration/test_reads.py | 35 +++++++++++++++++++++++++++++++++++
 4 files changed, 48 insertions(+), 11 deletions(-)

diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md
index ff374165..ae60de51 100644
--- a/mkdocs/docs/configuration.md
+++ b/mkdocs/docs/configuration.md
@@ -28,14 +28,16 @@ Iceberg tables support table properties to configure table 
behavior.
 
 ### Write options
 
-| Key                               | Options                           | 
Default | Description                                                           
                      |
-| --------------------------------- | --------------------------------- | 
------- | 
-------------------------------------------------------------------------------------------
 |
-| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd 
   | Sets the Parquet compression coddec.                                       
                 |
-| `write.parquet.compression-level` | Integer                           | null 
   | Parquet compression level for the codec. If not set, it is up to PyIceberg 
                 |
-| `write.parquet.page-size-bytes`   | Size in bytes                     | 1MB  
   | Set a target threshold for the approximate encoded size of data pages 
within a column chunk |
-| `write.parquet.page-row-limit`    | Number of rows                    | 
20000   | Set a target threshold for the approximate encoded size of data pages 
within a column chunk |
-| `write.parquet.dict-size-bytes`   | Size in bytes                     | 2MB  
   | Set the dictionary page size limit per row group                           
                 |
-| `write.parquet.row-group-limit`   | Number of rows                    | 
122880  | The Parquet row group limit                                           
                      |
+| Key                                    | Options                           | 
Default | Description                                                           
                      |
+| -------------------------------------- | --------------------------------- | 
------- | 
-------------------------------------------------------------------------------------------
 |
+| `write.parquet.compression-codec`      | `{uncompressed,zstd,gzip,snappy}` | 
zstd    | Sets the Parquet compression coddec.                                  
                      |
+| `write.parquet.compression-level`      | Integer                           | 
null    | Parquet compression level for the codec. If not set, it is up to 
PyIceberg                  |
+| `write.parquet.row-group-limit`        | Number of rows                    | 
1048576 | The upper bound of the number of entries within a single row group    
                      |
+| `write.parquet.page-size-bytes`        | Size in bytes                     | 
1MB     | Set a target threshold for the approximate encoded size of data pages 
within a column chunk |
+| `write.parquet.page-row-limit`         | Number of rows                    | 
20000   | Set a target threshold for the approximate encoded size of data pages 
within a column chunk |
+| `write.parquet.dict-size-bytes`        | Size in bytes                     | 
2MB     | Set the dictionary page size limit per row group                      
                      |
+| `write.parquet.row-group-limit`        | Number of rows                    | 
122880  | The Parquet row group limit                                           
                      |
+| `write.metadata.previous-versions-max` | Integer                           | 
100     | The max number of previous version metadata files to keep before 
deleting after commit.     |
 
 ### Table behavior options
 
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index f8e54081..33561da5 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -2137,8 +2137,8 @@ def write_file(io: FileIO, table_metadata: TableMetadata, 
tasks: Iterator[WriteT
     parquet_writer_kwargs = 
_get_parquet_writer_kwargs(table_metadata.properties)
     row_group_size = PropertyUtil.property_as_int(
         properties=table_metadata.properties,
-        property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
-        default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
+        property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
+        default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
     )
 
     def write_parquet(task: WriteTask) -> DataFile:
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index a7a2dec2..6ea78679 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -173,7 +173,7 @@ class TableProperties:
     PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024  # 128 MB
 
     PARQUET_ROW_GROUP_LIMIT = "write.parquet.row-group-limit"
-    PARQUET_ROW_GROUP_LIMIT_DEFAULT = 128 * 1024 * 1024  # 128 MB
+    PARQUET_ROW_GROUP_LIMIT_DEFAULT = 1048576
 
     PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
     PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024  # 1 MB
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index 078ec163..eb5cca87 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -46,6 +46,7 @@ from pyiceberg.table import Table
 from pyiceberg.types import (
     BooleanType,
     IntegerType,
+    LongType,
     NestedField,
     StringType,
     TimestampType,
@@ -666,6 +667,40 @@ def test_hive_locking_with_retry(session_catalog_hive: 
HiveCatalog) -> None:
 
 
 @pytest.mark.integration
+def test_configure_row_group_batch_size(session_catalog: Catalog) -> None:
+    from pyiceberg.table import TableProperties
+
+    table_name = "default.test_small_row_groups"
+    try:
+        session_catalog.drop_table(table_name)
+    except NoSuchTableError:
+        pass  # Just to make sure that the table doesn't exist
+
+    tbl = session_catalog.create_table(
+        table_name,
+        Schema(
+            NestedField(1, "number", LongType()),
+        ),
+        properties={TableProperties.PARQUET_ROW_GROUP_LIMIT: "1"},
+    )
+
+    # Write 10 row groups, that should end up as 10 batches
+    entries = 10
+    tbl.append(
+        pa.Table.from_pylist(
+            [
+                {
+                    "number": number,
+                }
+                for number in range(entries)
+            ],
+        )
+    )
+
+    batches = list(tbl.scan().to_arrow_batch_reader())
+    assert len(batches) == entries
+
+
 @pytest.mark.parametrize("catalog", 
[pytest.lazy_fixture("session_catalog_hive"), 
pytest.lazy_fixture("session_catalog")])
 def test_empty_scan_ordered_str(catalog: Catalog) -> None:
     table_empty_scan_ordered_str = 
catalog.load_table("default.test_empty_scan_ordered_str")

Reply via email to