This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d949af080e [python] Support truncate in table_commit (#7511)
d949af080e is described below

commit d949af080ea2424887b8a02354b0051b54b9f8aa
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Mar 24 19:36:40 2026 +0800

    [python] Support truncate in table_commit (#7511)
---
 .../pypaimon/tests/table/file_store_table_test.py  | 99 ++++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_commit.py  | 11 +++
 paimon-python/pypaimon/write/table_commit.py       |  5 ++
 3 files changed, 115 insertions(+)

diff --git a/paimon-python/pypaimon/tests/table/file_store_table_test.py 
b/paimon-python/pypaimon/tests/table/file_store_table_test.py
index d30846a2ae..d666f327f1 100644
--- a/paimon-python/pypaimon/tests/table/file_store_table_test.py
+++ b/paimon-python/pypaimon/tests/table/file_store_table_test.py
@@ -338,3 +338,102 @@ class FileStoreTableTest(unittest.TestCase):
         # Verify other properties are preserved
         self.assertEqual(copied_table.identifier, self.table.identifier)
         self.assertEqual(copied_table.table_path, self.table.table_path)
+
+    def test_truncate_table(self):
+        """Test truncate_table functionality."""
+        # Create a write builder
+        write_builder = self.table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        # Write some data (use int32 for user_id to match table schema)
+        data = pa.Table.from_pydict({
+            'user_id': [1, 2, 3],
+            'item_id': [100, 200, 300],
+            'behavior': ['view', 'click', 'purchase'],
+            'dt': ['2024-01-01', '2024-01-02', '2024-01-03']
+        }, schema=self.pa_schema)
+        table_write.write_arrow(data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # Verify data was written
+        read_builder = self.table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result_before = table_read.to_arrow(table_scan.plan().splits())
+        self.assertEqual(result_before.num_rows, 3)
+
+        # Truncate the table
+        write_builder = self.table.new_batch_write_builder()
+        table_commit = write_builder.new_commit()
+        table_commit.truncate_table()
+        table_commit.close()
+
+        # Verify data is truncated
+        read_builder = self.table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result_after = table_read.to_arrow(table_scan.plan().splits())
+        self.assertEqual(result_after.num_rows, 0)
+
+        # Verify new snapshot was created
+        snapshot = self.table.snapshot_manager().get_latest_snapshot()
+        self.assertIsNotNone(snapshot)
+        self.assertEqual(snapshot.commit_kind, 'OVERWRITE')
+
+    def test_truncate_empty_table(self):
+        """Test truncate_table on an empty table."""
+        # Truncate empty table - should still work
+        write_builder = self.table.new_batch_write_builder()
+        table_commit = write_builder.new_commit()
+        table_commit.truncate_table()
+        table_commit.close()
+
+        # Verify table is still empty
+        read_builder = self.table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        result = table_read.to_arrow(table_scan.plan().splits())
+        self.assertEqual(result.num_rows, 0)
+
+    def test_truncate_table_multiple_times(self):
+        """Test multiple consecutive truncate operations."""
+        # Write and truncate multiple times
+        for i in range(3):
+            # Write some data
+            write_builder = self.table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+
+            data = pa.Table.from_pydict({
+                'user_id': [i, i + 1],
+                'item_id': [i * 100, (i + 1) * 100],
+                'behavior': ['view', 'click'],
+                'dt': [f'2024-01-{i+1:02d}', f'2024-01-{i+2:02d}']
+            }, schema=self.pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+            # Verify data was written
+            read_builder = self.table.new_read_builder()
+            table_scan = read_builder.new_scan()
+            table_read = read_builder.new_read()
+            result = table_read.to_arrow(table_scan.plan().splits())
+            self.assertEqual(result.num_rows, 2)
+
+            # Truncate
+            write_builder = self.table.new_batch_write_builder()
+            table_commit = write_builder.new_commit()
+            table_commit.truncate_table()
+            table_commit.close()
+
+            # Verify data is truncated
+            read_builder = self.table.new_read_builder()
+            table_scan = read_builder.new_scan()
+            table_read = read_builder.new_read()
+            result = table_read.to_arrow(table_scan.plan().splits())
+            self.assertEqual(result.num_rows, 0)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 924e8446f8..829bcaa0b7 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -228,6 +228,17 @@ class FileStoreCommit:
             allow_rollback=False,
         )
 
+    def truncate_table(self, commit_identifier: int) -> None:
+        """Truncate the entire table, deleting all data."""
+        self._try_commit(
+            commit_kind="OVERWRITE",
+            commit_identifier=commit_identifier,
+            commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(
+                snapshot, None, []),
+            detect_conflicts=True,
+            allow_rollback=False,
+        )
+
     def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan,
                     detect_conflicts=False, allow_rollback=False):
 
diff --git a/paimon-python/pypaimon/write/table_commit.py 
b/paimon-python/pypaimon/write/table_commit.py
index 1eafafefc0..19918c1782 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -83,6 +83,11 @@ class BatchTableCommit(TableCommit):
     def commit(self, commit_messages: List[CommitMessage]):
         self._commit(commit_messages, BATCH_COMMIT_IDENTIFIER)
 
+    def truncate_table(self) -> None:
+        """Truncate the entire table, deleting all data."""
+        self._check_committed()
+        self.file_store_commit.truncate_table(BATCH_COMMIT_IDENTIFIER)
+
     def truncate_partitions(self, partitions: List[Dict[str, str]]) -> None:
         self._check_committed()
         self.file_store_commit.drop_partitions(partitions, 
BATCH_COMMIT_IDENTIFIER)

Reply via email to