This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3a36fe011a [Python] Suppport multi prepare commit in the same
TableWrite (#6526)
3a36fe011a is described below
commit 3a36fe011acf0ec86bf340cc0675dc5b58f44b47
Author: umi <[email protected]>
AuthorDate: Wed Nov 5 16:00:04 2025 +0800
[Python] Suppport multi prepare commit in the same TableWrite (#6526)
---
paimon-python/pypaimon/snapshot/snapshot.py | 2 +
paimon-python/pypaimon/table/file_store_table.py | 5 +-
paimon-python/pypaimon/table/table.py | 6 +-
.../pypaimon/tests/py36/ao_simple_test.py | 56 ++++++++
.../pypaimon/tests/write/table_write_test.py | 155 +++++++++++++++++++++
paimon-python/pypaimon/write/file_store_write.py | 5 +-
.../{batch_table_commit.py => table_commit.py} | 19 ++-
.../write/{batch_table_write.py => table_write.py} | 29 ++--
.../{batch_write_builder.py => write_builder.py} | 35 +++--
9 files changed, 287 insertions(+), 25 deletions(-)
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py
b/paimon-python/pypaimon/snapshot/snapshot.py
index 96b287ab55..f164f5eb61 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -21,6 +21,8 @@ from typing import Dict, Optional
from pypaimon.common.json_util import json_field
+COMMIT_IDENTIFIER = 0x7fffffffffffffff
+
@dataclass
class Snapshot:
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index cde282eff0..6132c9bd69 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -28,7 +28,7 @@ from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.table import Table
-from pypaimon.write.batch_write_builder import BatchWriteBuilder
+from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
FixedBucketRowKeyExtractor,
PostponeBucketRowKeyExtractor,
@@ -98,6 +98,9 @@ class FileStoreTable(Table):
def new_batch_write_builder(self) -> BatchWriteBuilder:
return BatchWriteBuilder(self)
+ def new_stream_write_builder(self) -> StreamWriteBuilder:
+ return StreamWriteBuilder(self)
+
def create_row_key_extractor(self) -> RowKeyExtractor:
bucket_mode = self.bucket_mode()
if bucket_mode == BucketMode.HASH_FIXED:
diff --git a/paimon-python/pypaimon/table/table.py
b/paimon-python/pypaimon/table/table.py
index 3a1fe2e622..e20784f1fc 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -19,7 +19,7 @@
from abc import ABC, abstractmethod
from pypaimon.read.read_builder import ReadBuilder
-from pypaimon.write.batch_write_builder import BatchWriteBuilder
+from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
class Table(ABC):
@@ -32,3 +32,7 @@ class Table(ABC):
@abstractmethod
def new_batch_write_builder(self) -> BatchWriteBuilder:
"""Returns a builder for building batch table write and table
commit."""
+
+ @abstractmethod
+ def new_stream_write_builder(self) -> StreamWriteBuilder:
+ """Returns a builder for building stream table write and table
commit."""
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index efb3189e06..b14fbcf8db 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -421,3 +421,59 @@ class AOSimpleTest(RESTBaseTest):
session_token="TOKEN",
region="cn-hangzhou",
endpoint_override="oss-bucket." +
props[OssOptions.OSS_ENDPOINT])
+
+ def test_multi_prepare_commit_ao(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
+ table = self.rest_catalog.get_table('default.test_append_only_parquet')
+ write_builder = table.new_stream_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ # write 1
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_write.prepare_commit(0)
+ # write 2
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ }
+ pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_write.prepare_commit(1)
+ # write 3
+ data3 = {
+ 'user_id': [9, 10],
+ 'item_id': [1009, 1010],
+ 'behavior': ['i', 'j'],
+ 'dt': ['p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ cm = table_write.prepare_commit(2)
+ # commit
+ table_commit.commit(cm, 2)
+ table_write.close()
+ table_commit.close()
+ self.assertEqual(2, table_write.file_store_write.commit_identifier)
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_sort_by(table_read.to_arrow(splits), 'user_id')
+ expected = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
+ 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009,
1010],
+ 'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'],
+ 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1']
+ }, schema=self.pa_schema)
+ self.assertEqual(expected, actual)
diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py
b/paimon-python/pypaimon/tests/write/table_write_test.py
new file mode 100644
index 0000000000..21b76731ac
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -0,0 +1,155 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os
+import shutil
+
+import tempfile
+import unittest
+
+from pypaimon import CatalogFactory, Schema
+import pyarrow as pa
+
+
+class TableWriteTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', True)
+
+ cls.pa_schema = pa.schema([
+ ('user_id', pa.int32()),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ ('dt', pa.string())
+ ])
+ cls.expected = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
+ 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009,
1010],
+ 'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'],
+ 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1']
+ }, schema=cls.pa_schema)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def test_multi_prepare_commit_ao(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.catalog.create_table('default.test_append_only_parquet', schema,
False)
+ table = self.catalog.get_table('default.test_append_only_parquet')
+ write_builder = table.new_stream_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ # write 1
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_write.prepare_commit(0)
+ # write 2
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ }
+ pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_write.prepare_commit(1)
+ # write 3
+ data3 = {
+ 'user_id': [9, 10],
+ 'item_id': [1009, 1010],
+ 'behavior': ['i', 'j'],
+ 'dt': ['p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ cm = table_write.prepare_commit(2)
+ # commit
+ table_commit.commit(cm, 2)
+ table_write.close()
+ table_commit.close()
+ self.assertEqual(2, table_write.file_store_write.commit_identifier)
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits).sort_by('user_id')
+ self.assertEqual(self.expected, actual)
+
+ def test_multi_prepare_commit_pk(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], primary_keys=['user_id', 'dt'],
+ options={'bucket': '2'})
+ self.catalog.create_table('default.test_primary_key_parquet', schema,
False)
+ table = self.catalog.get_table('default.test_primary_key_parquet')
+ write_builder = table.new_stream_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ # write 1
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_write.prepare_commit(0)
+ # write 2
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ }
+ pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_write.prepare_commit(1)
+ # write 3
+ data3 = {
+ 'user_id': [9, 10],
+ 'item_id': [1009, 1010],
+ 'behavior': ['i', 'j'],
+ 'dt': ['p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ cm = table_write.prepare_commit(2)
+ # commit
+ table_commit.commit(cm, 2)
+ table_write.close()
+ table_commit.close()
+ self.assertEqual(2, table_write.file_store_write.commit_identifier)
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits).sort_by('user_id')
+ self.assertEqual(self.expected, actual)
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index 9ebabf1103..c100b64966 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -37,6 +37,7 @@ class FileStoreWrite:
self.data_writers: Dict[Tuple, DataWriter] = {}
self.max_seq_numbers: dict = {}
self.write_cols = None
+ self.commit_identifier = 0
def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
key = (partition, bucket)
@@ -48,6 +49,7 @@ class FileStoreWrite:
def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
def max_seq_number():
return self._seq_number_stats(partition).get(bucket, 1)
+
# Check if table has blob columns
if self._has_blob_columns():
return DataBlobWriter(
@@ -83,7 +85,8 @@ class FileStoreWrite:
return True
return False
- def prepare_commit(self) -> List[CommitMessage]:
+ def prepare_commit(self, commit_identifier) -> List[CommitMessage]:
+ self.commit_identifier = commit_identifier
commit_messages = []
for (partition, bucket), writer in self.data_writers.items():
committed_files = writer.prepare_commit()
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py
b/paimon-python/pypaimon/write/table_commit.py
similarity index 86%
rename from paimon-python/pypaimon/write/batch_table_commit.py
rename to paimon-python/pypaimon/write/table_commit.py
index 7f42e1cef1..0dcfabf99e 100644
--- a/paimon-python/pypaimon/write/batch_table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -16,14 +16,14 @@
# limitations under the License.
################################################################################
-import time
from typing import List, Optional
+from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_commit import FileStoreCommit
-class BatchTableCommit:
+class TableCommit:
"""Python implementation of BatchTableCommit for batch writing
scenarios."""
def __init__(self, table, commit_user: str, static_partition:
Optional[dict]):
@@ -41,15 +41,13 @@ class BatchTableCommit:
self.file_store_commit = FileStoreCommit(snapshot_commit, table,
commit_user)
self.batch_committed = False
- def commit(self, commit_messages: List[CommitMessage]):
+ def _commit(self, commit_messages: List[CommitMessage], commit_identifier:
int = COMMIT_IDENTIFIER):
self._check_committed()
non_empty_messages = [msg for msg in commit_messages if not
msg.is_empty()]
if not non_empty_messages:
return
- commit_identifier = int(time.time() * 1000)
-
try:
if self.overwrite_partition is not None:
self.file_store_commit.overwrite(
@@ -76,3 +74,14 @@ class BatchTableCommit:
if self.batch_committed:
raise RuntimeError("BatchTableCommit only supports one-time
committing.")
self.batch_committed = True
+
+
+class BatchTableCommit(TableCommit):
+ def commit(self, commit_messages: List[CommitMessage]):
+ self._commit(commit_messages, COMMIT_IDENTIFIER)
+
+
+class StreamTableCommit(TableCommit):
+
+ def commit(self, commit_messages: List[CommitMessage], commit_identifier:
int = COMMIT_IDENTIFIER):
+ self._commit(commit_messages, commit_identifier)
diff --git a/paimon-python/pypaimon/write/batch_table_write.py
b/paimon-python/pypaimon/write/table_write.py
similarity index 89%
rename from paimon-python/pypaimon/write/batch_table_write.py
rename to paimon-python/pypaimon/write/table_write.py
index f8d0660bfa..3ccc078223 100644
--- a/paimon-python/pypaimon/write/batch_table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -15,18 +15,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
from collections import defaultdict
from typing import List
import pyarrow as pa
from pypaimon.schema.data_types import PyarrowFieldParser
+from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_write import FileStoreWrite
-class BatchTableWrite:
+class TableWrite:
def __init__(self, table):
from pypaimon.table.file_store_table import FileStoreTable
@@ -34,7 +34,6 @@ class BatchTableWrite:
self.table_pyarrow_schema =
PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields)
self.file_store_write = FileStoreWrite(self.table)
self.row_key_extractor = self.table.create_row_key_extractor()
- self.batch_committed = False
def write_arrow(self, table: pa.Table):
batches_iterator = table.to_batches()
@@ -59,12 +58,6 @@ class BatchTableWrite:
record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema)
return self.write_arrow_batch(record_batch)
- def prepare_commit(self) -> List[CommitMessage]:
- if self.batch_committed:
- raise RuntimeError("BatchTableWrite only supports one-time
committing.")
- self.batch_committed = True
- return self.file_store_write.prepare_commit()
-
def with_write_type(self, write_cols: List[str]):
for col in write_cols:
if col not in self.table_pyarrow_schema.names:
@@ -83,3 +76,21 @@ class BatchTableWrite:
f"Input schema is: {data_schema} "
f"Table schema is: {self.table_pyarrow_schema} "
f"Write cols is:
{self.file_store_write.write_cols}")
+
+
+class BatchTableWrite(TableWrite):
+ def __init__(self, table):
+ super().__init__(table)
+ self.batch_committed = False
+
+ def prepare_commit(self) -> List[CommitMessage]:
+ if self.batch_committed:
+ raise RuntimeError("BatchTableWrite only supports one-time
committing.")
+ self.batch_committed = True
+ return self.file_store_write.prepare_commit(COMMIT_IDENTIFIER)
+
+
+class StreamTableWrite(TableWrite):
+
+ def prepare_commit(self, commit_identifier: int = COMMIT_IDENTIFIER) ->
List[CommitMessage]:
+ return self.file_store_write.prepare_commit(commit_identifier)
diff --git a/paimon-python/pypaimon/write/batch_write_builder.py
b/paimon-python/pypaimon/write/write_builder.py
similarity index 73%
rename from paimon-python/pypaimon/write/batch_write_builder.py
rename to paimon-python/pypaimon/write/write_builder.py
index 2380530fbc..8c9ed725f5 100644
--- a/paimon-python/pypaimon/write/batch_write_builder.py
+++ b/paimon-python/pypaimon/write/write_builder.py
@@ -17,14 +17,15 @@
################################################################################
import uuid
+from abc import ABC
from typing import Optional
from pypaimon.common.core_options import CoreOptions
-from pypaimon.write.batch_table_commit import BatchTableCommit
-from pypaimon.write.batch_table_write import BatchTableWrite
+from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit,
TableCommit
+from pypaimon.write.table_write import BatchTableWrite, StreamTableWrite,
TableWrite
-class BatchWriteBuilder:
+class WriteBuilder(ABC):
def __init__(self, table):
from pypaimon.table.file_store_table import FileStoreTable
@@ -36,15 +37,33 @@ class BatchWriteBuilder:
self.static_partition = static_partition if static_partition is not
None else {}
return self
- def new_write(self) -> BatchTableWrite:
- return BatchTableWrite(self.table)
+ def new_write(self) -> TableWrite:
+ """Returns a table write."""
- def new_commit(self) -> BatchTableCommit:
- commit = BatchTableCommit(self.table, self.commit_user,
self.static_partition)
- return commit
+ def new_commit(self) -> TableCommit:
+ """Returns a table commit."""
def _create_commit_user(self):
if CoreOptions.COMMIT_USER_PREFIX in self.table.options:
return
f"{self.table.options.get(CoreOptions.COMMIT_USER_PREFIX)}_{uuid.uuid4()}"
else:
return str(uuid.uuid4())
+
+
+class BatchWriteBuilder(WriteBuilder):
+
+ def new_write(self) -> BatchTableWrite:
+ return BatchTableWrite(self.table)
+
+ def new_commit(self) -> BatchTableCommit:
+ commit = BatchTableCommit(self.table, self.commit_user,
self.static_partition)
+ return commit
+
+
+class StreamWriteBuilder(WriteBuilder):
+ def new_write(self) -> StreamTableWrite:
+ return StreamTableWrite(self.table)
+
+ def new_commit(self) -> StreamTableCommit:
+ commit = StreamTableCommit(self.table, self.commit_user,
self.static_partition)
+ return commit