kevinjqliu commented on code in PR #3124:
URL: https://github.com/apache/iceberg-python/pull/3124#discussion_r2897431358


##########
pyiceberg/table/maintenance.py:
##########
@@ -43,3 +43,26 @@ def expire_snapshots(self) -> ExpireSnapshots:
         from pyiceberg.table.update.snapshot import ExpireSnapshots
 
         return ExpireSnapshots(transaction=Transaction(self.tbl, 
autocommit=True))
+
+    def compact(self) -> None:
+        """Compact the table's data files by reading and overwriting the 
entire table.

Review Comment:
   this should be data and delete files. but generally it compacts the entire 
table



##########
tests/table/test_maintenance.py:
##########
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import random
+
+import pyarrow as pa
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import NoSuchNamespaceError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.transforms import IdentityTransform
+
+
+def test_maintenance_compact(catalog: Catalog) -> None:
+    # Setup Schema and specs
+    from pyiceberg.types import LongType, NestedField, StringType
+
+    schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "category", StringType()),
+        NestedField(3, "value", LongType()),
+    )
+    spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, 
transform=IdentityTransform(), name="category"))
+
+    # Create the namespace and table
+    try:
+        catalog.create_namespace("default")
+    except NoSuchNamespaceError:
+        pass
+    table = catalog.create_table(
+        "default.test_compaction",
+        schema=schema,
+        partition_spec=spec,
+    )
+
+    # Append many small data files
+    categories = ["cat1", "cat2", "cat3"]
+    for i in range(12):
+        table.append(
+            pa.table(
+                {
+                    "id": list(range(i * 10, (i + 1) * 10)),
+                    "category": [categories[i % 3]] * 10,
+                    "value": [random.randint(1, 100) for _ in range(10)],
+                }
+            )
+        )
+
+    # Verify state before compaction
+    before_files = list(table.scan().plan_files())
+    assert len(before_files) == 12
+    assert table.scan().to_arrow().num_rows == 120
+
+    # Execute Compaction
+    table.maintenance.compact()
+
+    # Verify state after compaction
+    table.refresh()
+    after_files = list(table.scan().plan_files())
+    assert len(after_files) == 3  # Should be 1 optimized data file per 
partition
+    assert table.scan().to_arrow().num_rows == 120
+

Review Comment:
   since its a small result set, we should verify the data is the same too



##########
pyiceberg/table/maintenance.py:
##########
@@ -43,3 +43,26 @@ def expire_snapshots(self) -> ExpireSnapshots:
         from pyiceberg.table.update.snapshot import ExpireSnapshots
 
         return ExpireSnapshots(transaction=Transaction(self.tbl, 
autocommit=True))
+
+    def compact(self) -> None:
+        """Compact the table's data files by reading and overwriting the 
entire table.
+
+        Note: This is a full-table compaction that leverages Arrow for 
binpacking.
+        It currently reads the entire table into memory via `.to_arrow()`.
+
+        This reads all existing data into memory and writes it back out using 
the
+        target file size settings (write.target-file-size-bytes), atomically
+        dropping the old files and replacing them with fewer, larger files.
+        """
+        # Read the current table state into memory
+        arrow_table = self.tbl.scan().to_arrow()
+
+        # Guard: if the table is completely empty, there's nothing to compact.
+        # Doing an overwrite with an empty table would result in deleting 
everything.
+        if arrow_table.num_rows == 0:
+            logger.info("Table contains no rows, skipping compaction.")
+            return
+
+        # Overwrite the table atomically (REPLACE operation)
+        with self.tbl.transaction() as txn:
+            txn.overwrite(arrow_table, snapshot_properties={"snapshot-type": 
"replace", "replace-operation": "compaction"})

Review Comment:
   i think we should have a `replace` operation instead 
   
https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/DataOperations.html#REPLACE
   
   we might want to create the `.replace()` first



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to