This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d949af080e [python] Support truncate in table_commit (#7511)
d949af080e is described below
commit d949af080ea2424887b8a02354b0051b54b9f8aa
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Mar 24 19:36:40 2026 +0800
[python] Support truncate in table_commit (#7511)
---
.../pypaimon/tests/table/file_store_table_test.py | 99 ++++++++++++++++++++++
paimon-python/pypaimon/write/file_store_commit.py | 11 +++
paimon-python/pypaimon/write/table_commit.py | 5 ++
3 files changed, 115 insertions(+)
diff --git a/paimon-python/pypaimon/tests/table/file_store_table_test.py
b/paimon-python/pypaimon/tests/table/file_store_table_test.py
index d30846a2ae..d666f327f1 100644
--- a/paimon-python/pypaimon/tests/table/file_store_table_test.py
+++ b/paimon-python/pypaimon/tests/table/file_store_table_test.py
@@ -338,3 +338,102 @@ class FileStoreTableTest(unittest.TestCase):
# Verify other properties are preserved
self.assertEqual(copied_table.identifier, self.table.identifier)
self.assertEqual(copied_table.table_path, self.table.table_path)
+
+ def test_truncate_table(self):
+ """Test truncate_table functionality."""
+ # Create a write builder
+ write_builder = self.table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ # Write some data (use int32 for user_id to match table schema)
+ data = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [100, 200, 300],
+ 'behavior': ['view', 'click', 'purchase'],
+ 'dt': ['2024-01-01', '2024-01-02', '2024-01-03']
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Verify data was written
+ read_builder = self.table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result_before = table_read.to_arrow(table_scan.plan().splits())
+ self.assertEqual(result_before.num_rows, 3)
+
+ # Truncate the table
+ write_builder = self.table.new_batch_write_builder()
+ table_commit = write_builder.new_commit()
+ table_commit.truncate_table()
+ table_commit.close()
+
+ # Verify data is truncated
+ read_builder = self.table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result_after = table_read.to_arrow(table_scan.plan().splits())
+ self.assertEqual(result_after.num_rows, 0)
+
+ # Verify new snapshot was created
+ snapshot = self.table.snapshot_manager().get_latest_snapshot()
+ self.assertIsNotNone(snapshot)
+ self.assertEqual(snapshot.commit_kind, 'OVERWRITE')
+
+ def test_truncate_empty_table(self):
+ """Test truncate_table on an empty table."""
+ # Truncate empty table - should still work
+ write_builder = self.table.new_batch_write_builder()
+ table_commit = write_builder.new_commit()
+ table_commit.truncate_table()
+ table_commit.close()
+
+ # Verify table is still empty
+ read_builder = self.table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+ self.assertEqual(result.num_rows, 0)
+
+ def test_truncate_table_multiple_times(self):
+ """Test multiple consecutive truncate operations."""
+ # Write and truncate multiple times
+ for i in range(3):
+ # Write some data
+ write_builder = self.table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = pa.Table.from_pydict({
+ 'user_id': [i, i + 1],
+ 'item_id': [i * 100, (i + 1) * 100],
+ 'behavior': ['view', 'click'],
+ 'dt': [f'2024-01-{i+1:02d}', f'2024-01-{i+2:02d}']
+ }, schema=self.pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Verify data was written
+ read_builder = self.table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+ self.assertEqual(result.num_rows, 2)
+
+ # Truncate
+ write_builder = self.table.new_batch_write_builder()
+ table_commit = write_builder.new_commit()
+ table_commit.truncate_table()
+ table_commit.close()
+
+ # Verify data is truncated
+ read_builder = self.table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+ self.assertEqual(result.num_rows, 0)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 924e8446f8..829bcaa0b7 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -228,6 +228,17 @@ class FileStoreCommit:
allow_rollback=False,
)
+ def truncate_table(self, commit_identifier: int) -> None:
+ """Truncate the entire table, deleting all data."""
+ self._try_commit(
+ commit_kind="OVERWRITE",
+ commit_identifier=commit_identifier,
+ commit_entries_plan=lambda snapshot:
self._generate_overwrite_entries(
+ snapshot, None, []),
+ detect_conflicts=True,
+ allow_rollback=False,
+ )
+
def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan,
detect_conflicts=False, allow_rollback=False):
diff --git a/paimon-python/pypaimon/write/table_commit.py
b/paimon-python/pypaimon/write/table_commit.py
index 1eafafefc0..19918c1782 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -83,6 +83,11 @@ class BatchTableCommit(TableCommit):
def commit(self, commit_messages: List[CommitMessage]):
self._commit(commit_messages, BATCH_COMMIT_IDENTIFIER)
+ def truncate_table(self) -> None:
+ """Truncate the entire table, deleting all data."""
+ self._check_committed()
+ self.file_store_commit.truncate_table(BATCH_COMMIT_IDENTIFIER)
+
def truncate_partitions(self, partitions: List[Dict[str, str]]) -> None:
self._check_committed()
self.file_store_commit.drop_partitions(partitions,
BATCH_COMMIT_IDENTIFIER)