This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new e8bdc872 refactor partition_summary_limit into
SnapshotSummaryCollector constr… (#1940)
e8bdc872 is described below
commit e8bdc87234b02c66aac972849a0d1438f1fa5e4e
Author: stevie9868 <[email protected]>
AuthorDate: Mon Apr 28 10:59:30 2025 -0700
refactor partition_summary_limit into SnapshotSummaryCollector constr…
(#1940)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
Closes #1779
# Rationale for this change
See
[issue](https://github.com/apache/iceberg-python/issues/1779#issuecomment-2816700603)
# Are these changes tested?
Tested locally
# Are there any user-facing changes?
No
<!-- In the case of user-facing changes, please add the changelog label.
-->
---------
Co-authored-by: Yingjian Wu <[email protected]>
---
pyiceberg/table/snapshots.py | 4 ++--
pyiceberg/table/update/snapshot.py | 3 +--
tests/table/test_snapshots.py | 35 +++++++++++++++++++++++++++++++++++
3 files changed, 38 insertions(+), 4 deletions(-)
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index af3f0404..1e48126c 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -272,10 +272,10 @@ class SnapshotSummaryCollector:
partition_metrics: DefaultDict[str, UpdateMetrics]
max_changed_partitions_for_summaries: int
- def __init__(self) -> None:
+ def __init__(self, partition_summary_limit: int = 0) -> None:
self.metrics = UpdateMetrics()
self.partition_metrics = defaultdict(UpdateMetrics)
- self.max_changed_partitions_for_summaries = 0
+ self.max_changed_partitions_for_summaries = partition_summary_limit
def set_partition_summary_limit(self, limit: int) -> None:
self.max_changed_partitions_for_summaries = limit
diff --git a/pyiceberg/table/update/snapshot.py
b/pyiceberg/table/update/snapshot.py
index b53c3317..a8216774 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -203,13 +203,12 @@ class _SnapshotProducer(UpdateTableMetadata[U],
Generic[U]):
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) ->
Summary:
from pyiceberg.table import TableProperties
- ssc = SnapshotSummaryCollector()
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT,
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
- ssc.set_partition_summary_limit(partition_summary_limit)
+ ssc =
SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
for data_file in self._added_data_files:
ssc.add_file(
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index 24d5f0ff..3f0dae14 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -218,6 +218,41 @@ def test_snapshot_summary_collector_with_partition() ->
None:
}
[email protected]
+def test_snapshot_summary_collector_with_partition_limit_in_constructor() ->
None:
+ # Given
+ partition_summary_limit = 10
+ ssc =
SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)
+
+ assert ssc.build() == {}
+ schema = Schema(
+ NestedField(field_id=1, name="bool_field", field_type=BooleanType(),
required=False),
+ NestedField(field_id=2, name="string_field", field_type=StringType(),
required=False),
+ NestedField(field_id=3, name="int_field", field_type=IntegerType(),
required=False),
+ )
+ spec = PartitionSpec(PartitionField(source_id=3, field_id=1001,
transform=IdentityTransform(), name="int_field"))
+ data_file_1 = DataFile.from_args(content=DataFileContent.DATA,
record_count=100, file_size_in_bytes=1234, partition=Record(1))
+ data_file_2 = DataFile.from_args(content=DataFileContent.DATA,
record_count=200, file_size_in_bytes=4321, partition=Record(2))
+
+ # When
+ ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec)
+ ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec)
+ ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec)
+
+ # Then
+ assert ssc.build() == {
+ "added-files-size": "1234",
+ "removed-files-size": "5555",
+ "added-data-files": "1",
+ "deleted-data-files": "2",
+ "added-records": "100",
+ "deleted-records": "300",
+ "changed-partition-count": "2",
+ "partitions.int_field=1":
"added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100",
+ "partitions.int_field=2":
"removed-files-size=4321,deleted-data-files=1,deleted-records=200",
+ }
+
+
def test_merge_snapshot_summaries_empty() -> None:
assert update_snapshot_summaries(Summary(Operation.APPEND)) == Summary(
operation=Operation.APPEND,