This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new e8bdc872 refactor partition_summary_limit into 
SnapshotSummaryCollector constr… (#1940)
e8bdc872 is described below

commit e8bdc87234b02c66aac972849a0d1438f1fa5e4e
Author: stevie9868 <[email protected]>
AuthorDate: Mon Apr 28 10:59:30 2025 -0700

    refactor partition_summary_limit into SnapshotSummaryCollector constr… 
(#1940)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    Closes #1779
    
    # Rationale for this change
    See
    
[issue](https://github.com/apache/iceberg-python/issues/1779#issuecomment-2816700603)
    # Are these changes tested?
    Tested locally
    # Are there any user-facing changes?
    No
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
    
    ---------
    
    Co-authored-by: Yingjian Wu <[email protected]>
---
 pyiceberg/table/snapshots.py       |  4 ++--
 pyiceberg/table/update/snapshot.py |  3 +--
 tests/table/test_snapshots.py      | 35 +++++++++++++++++++++++++++++++++++
 3 files changed, 38 insertions(+), 4 deletions(-)

diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index af3f0404..1e48126c 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -272,10 +272,10 @@ class SnapshotSummaryCollector:
     partition_metrics: DefaultDict[str, UpdateMetrics]
     max_changed_partitions_for_summaries: int
 
-    def __init__(self) -> None:
+    def __init__(self, partition_summary_limit: int = 0) -> None:
         self.metrics = UpdateMetrics()
         self.partition_metrics = defaultdict(UpdateMetrics)
-        self.max_changed_partitions_for_summaries = 0
+        self.max_changed_partitions_for_summaries = partition_summary_limit
 
     def set_partition_summary_limit(self, limit: int) -> None:
         self.max_changed_partitions_for_summaries = limit
diff --git a/pyiceberg/table/update/snapshot.py 
b/pyiceberg/table/update/snapshot.py
index b53c3317..a8216774 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -203,13 +203,12 @@ class _SnapshotProducer(UpdateTableMetadata[U], 
Generic[U]):
     def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> 
Summary:
         from pyiceberg.table import TableProperties
 
-        ssc = SnapshotSummaryCollector()
         partition_summary_limit = int(
             self._transaction.table_metadata.properties.get(
                 TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, 
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
             )
         )
-        ssc.set_partition_summary_limit(partition_summary_limit)
+        ssc = 
SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
 
         for data_file in self._added_data_files:
             ssc.add_file(
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index 24d5f0ff..3f0dae14 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -218,6 +218,41 @@ def test_snapshot_summary_collector_with_partition() -> 
None:
     }
 
 
[email protected]
+def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> 
None:
+    # Given
+    partition_summary_limit = 10
+    ssc = 
SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
+
+    assert ssc.build() == {}
+    schema = Schema(
+        NestedField(field_id=1, name="bool_field", field_type=BooleanType(), 
required=False),
+        NestedField(field_id=2, name="string_field", field_type=StringType(), 
required=False),
+        NestedField(field_id=3, name="int_field", field_type=IntegerType(), 
required=False),
+    )
+    spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, 
transform=IdentityTransform(), name="int_field"))
+    data_file_1 = DataFile.from_args(content=DataFileContent.DATA, 
record_count=100, file_size_in_bytes=1234, partition=Record(1))
+    data_file_2 = DataFile.from_args(content=DataFileContent.DATA, 
record_count=200, file_size_in_bytes=4321, partition=Record(2))
+
+    # When
+    ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec)
+    ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec)
+    ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec)
+
+    # Then
+    assert ssc.build() == {
+        "added-files-size": "1234",
+        "removed-files-size": "5555",
+        "added-data-files": "1",
+        "deleted-data-files": "2",
+        "added-records": "100",
+        "deleted-records": "300",
+        "changed-partition-count": "2",
+        "partitions.int_field=1": 
"added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100",
+        "partitions.int_field=2": 
"removed-files-size=4321,deleted-data-files=1,deleted-records=200",
+    }
+
+
 def test_merge_snapshot_summaries_empty() -> None:
     assert update_snapshot_summaries(Summary(Operation.APPEND)) == Summary(
         operation=Operation.APPEND,

Reply via email to