This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git
The following commit(s) were added to refs/heads/main by this push:
new 56bcb5e Introduce batch write api (#4)
56bcb5e is described below
commit 56bcb5e30bc939ff8144a3ce88552fdb6e779fa6
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 16 17:46:25 2024 +0800
Introduce batch write api (#4)
---
.github/workflows/paimon-python-checks.yml | 2 +
dev/lint-python.sh | 13 +++--
java_based_implementation/api_impl.py | 61 +++++++++++++++++++++--
paimon_python_api/{table.py => commit_message.py} | 11 ++--
paimon_python_api/table.py | 5 ++
paimon_python_api/{table.py => table_commit.py} | 14 ++++--
paimon_python_api/{table.py => table_write.py} | 16 ++++--
paimon_python_api/{table.py => write_builder.py} | 23 +++++++--
8 files changed, 115 insertions(+), 30 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 13c5c26..1e85b1b 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -35,6 +35,8 @@ jobs:
runs-on: ubuntu-latest
steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
- name: Run lint-python.sh
run: |
chmod +x dev/lint-python.sh
diff --git a/dev/lint-python.sh b/dev/lint-python.sh
index 4f7f89d..a287853 100755
--- a/dev/lint-python.sh
+++ b/dev/lint-python.sh
@@ -149,9 +149,9 @@ function get_os_index() {
local sys_os=$(uname -s)
echo "Detected OS: ${sys_os}"
if [ ${sys_os} == "Darwin" ]; then
- return 0
+ echo 0
elif [[ ${sys_os} == "Linux" ]]; then
- return 1
+ echo 1
else
echo "Unsupported OS: ${sys_os}"
exit 1
@@ -360,8 +360,13 @@ function install_environment() {
print_function "STAGE" "installing environment"
#get the index of the SUPPORT_OS array for convenient to install tool.
- get_os_index $sys_os
- local os_index=$?
+ local os_index=$(get_os_index | tail -n1)
+
+ # In some Linux distributions, md5sum is installed instead of md5. But our
miniconda installation shell uses md5
+ if [ "$os_index" -eq 1 ] && [ ! -f /usr/local/bin/md5 ]; then
+ echo "Creating symlink for md5 to md5sum..."
+ sudo ln -s $(which md5sum) /usr/local/bin/md5
+ fi
# step-1 install wget
# the file size of the miniconda.sh is too big to use "wget" tool to
download instead
diff --git a/java_based_implementation/api_impl.py
b/java_based_implementation/api_impl.py
index f0b35f1..8f8e87f 100644
--- a/java_based_implementation/api_impl.py
+++ b/java_based_implementation/api_impl.py
@@ -18,9 +18,9 @@
from java_based_implementation.java_gateway import get_gateway
from java_based_implementation.util.java_utils import to_j_catalog_context
-from paimon_python_api import catalog, read_builder, table_scan, split,
table_read
-from paimon_python_api import table
-from pyarrow import RecordBatchReader
+from paimon_python_api import (catalog, table, read_builder, table_scan,
split, table_read,
+ write_builder, table_write, commit_message,
table_commit)
+from pyarrow import RecordBatchReader, RecordBatch
from typing import List
from typing_extensions import Self
@@ -53,6 +53,10 @@ class Table(table.Table):
j_read_builder = self._j_table.newReadBuilder()
return ReadBuilder(j_read_builder)
+ def new_batch_write_builder(self) -> 'BatchWriteBuilder':
+ j_batch_write_builder = self._j_table.newBatchWriteBuilder()
+ return BatchWriteBuilder(j_batch_write_builder)
+
class ReadBuilder(read_builder.ReadBuilder):
@@ -110,3 +114,54 @@ class TableRead(table_read.TableRead):
def create_reader(self, split: Split) -> RecordBatchReader:
# TODO
pass
+
+
+class BatchWriteBuilder(write_builder.BatchWriteBuilder):
+
+ def __init__(self, j_batch_write_builder):
+ self._j_batch_write_builder = j_batch_write_builder
+
+ def with_overwrite(self, static_partition: dict) -> Self:
+ self._j_batch_write_builder.withOverwrite(static_partition)
+ return self
+
+ def new_write(self) -> 'BatchTableWrite':
+ j_batch_table_write = self._j_batch_write_builder.newWrite()
+ return BatchTableWrite(j_batch_table_write)
+
+ def new_commit(self) -> 'BatchTableCommit':
+ j_batch_table_commit = self._j_batch_write_builder.newCommit()
+ return BatchTableCommit(j_batch_table_commit)
+
+
+class BatchTableWrite(table_write.BatchTableWrite):
+
+ def __init__(self, j_batch_table_write):
+ self._j_batch_table_write = j_batch_table_write
+
+ def write(self, record_batch: RecordBatch):
+ # TODO
+ pass
+
+ def prepare_commit(self) -> List['CommitMessage']:
+ j_commit_messages = self._j_batch_table_write.prepareCommit()
+ return list(map(lambda cm: CommitMessage(cm), j_commit_messages))
+
+
+class CommitMessage(commit_message.CommitMessage):
+
+ def __init__(self, j_commit_message):
+ self._j_commit_message = j_commit_message
+
+ def to_j_commit_message(self):
+ return self._j_commit_message
+
+
+class BatchTableCommit(table_commit.BatchTableCommit):
+
+ def __init__(self, j_batch_table_commit):
+ self._j_batch_table_commit = j_batch_table_commit
+
+ def commit(self, commit_messages: List[CommitMessage]):
+ j_commit_messages = list(map(lambda cm: cm.to_j_commit_message(),
commit_messages))
+ self._j_batch_table_commit.commit(j_commit_messages)
diff --git a/paimon_python_api/table.py b/paimon_python_api/commit_message.py
similarity index 76%
copy from paimon_python_api/table.py
copy to paimon_python_api/commit_message.py
index 11d1bde..6f2534e 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/commit_message.py
@@ -16,13 +16,8 @@
# limitations under the License.
#################################################################################
-from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from abc import ABC
-class Table(ABC):
- """A table provides basic abstraction for table read and write."""
-
- @abstractmethod
- def new_read_builder(self) -> ReadBuilder:
- """Return a builder for building table scan and table read."""
+class CommitMessage(ABC):
+ """Commit message collected from writer."""
diff --git a/paimon_python_api/table.py b/paimon_python_api/table.py
index 11d1bde..f325cf6 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/table.py
@@ -18,6 +18,7 @@
from abc import ABC, abstractmethod
from read_builder import ReadBuilder
+from write_builder import BatchWriteBuilder
class Table(ABC):
@@ -26,3 +27,7 @@ class Table(ABC):
@abstractmethod
def new_read_builder(self) -> ReadBuilder:
"""Return a builder for building table scan and table read."""
+
+ @abstractmethod
+ def new_batch_write_builder(self) -> BatchWriteBuilder:
+ """Returns a builder for building batch table write and table
commit."""
diff --git a/paimon_python_api/table.py b/paimon_python_api/table_commit.py
similarity index 70%
copy from paimon_python_api/table.py
copy to paimon_python_api/table_commit.py
index 11d1bde..bdbbe3b 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/table_commit.py
@@ -17,12 +17,16 @@
#################################################################################
from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from commit_message import CommitMessage
+from typing import List
-class Table(ABC):
- """A table provides basic abstraction for table read and write."""
+class BatchTableCommit(ABC):
+ """A table commit for batch processing. Recommended for one-time
committing."""
@abstractmethod
- def new_read_builder(self) -> ReadBuilder:
- """Return a builder for building table scan and table read."""
+ def commit(self, commit_messages: List[CommitMessage]):
+ """
+ Commit the commit messages to generate snapshots. One commit may
generate
+ up to two snapshots, one for adding new files and the other for
compaction.
+ """
diff --git a/paimon_python_api/table.py b/paimon_python_api/table_write.py
similarity index 68%
copy from paimon_python_api/table.py
copy to paimon_python_api/table_write.py
index 11d1bde..b6ae4ce 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/table_write.py
@@ -17,12 +17,18 @@
#################################################################################
from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from commit_message import CommitMessage
+from pyarrow import RecordBatch
+from typing import List
-class Table(ABC):
- """A table provides basic abstraction for table read and write."""
+class BatchTableWrite(ABC):
+ """A table write for batch processing. Recommended for one-time
committing."""
@abstractmethod
- def new_read_builder(self) -> ReadBuilder:
- """Return a builder for building table scan and table read."""
+ def write(self, record_batch: RecordBatch):
+ """ Write a batch to the writer. */"""
+
+ @abstractmethod
+ def prepare_commit(self) -> List[CommitMessage]:
+ """Prepare commit message for TableCommit. Collect incremental files
for this writer."""
diff --git a/paimon_python_api/table.py b/paimon_python_api/write_builder.py
similarity index 58%
copy from paimon_python_api/table.py
copy to paimon_python_api/write_builder.py
index 11d1bde..1757e27 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/write_builder.py
@@ -17,12 +17,25 @@
#################################################################################
from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from table_commit import BatchTableCommit
+from table_write import BatchTableWrite
+from typing_extensions import Self
-class Table(ABC):
- """A table provides basic abstraction for table read and write."""
+class BatchWriteBuilder(ABC):
+ """An interface for building the TableScan and TableRead."""
@abstractmethod
- def new_read_builder(self) -> ReadBuilder:
- """Return a builder for building table scan and table read."""
+ def with_overwrite(self, static_partition: dict) -> Self:
+ """
+ Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)'
semantics of SQL.
+ If you pass an empty dict, it means OVERWRITE whole table.
+ """
+
+ @abstractmethod
+ def new_write(self) -> BatchTableWrite:
+ """Create a BatchTableWrite to perform batch writing."""
+
+ @abstractmethod
+ def new_commit(self) -> BatchTableCommit:
+ """Create a BatchTableCommit to perform batch commiting."""