This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 56bcb5e  Introduce batch write api (#4)
56bcb5e is described below

commit 56bcb5e30bc939ff8144a3ce88552fdb6e779fa6
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 16 17:46:25 2024 +0800

    Introduce batch write api (#4)
---
 .github/workflows/paimon-python-checks.yml        |  2 +
 dev/lint-python.sh                                | 13 +++--
 java_based_implementation/api_impl.py             | 61 +++++++++++++++++++++--
 paimon_python_api/{table.py => commit_message.py} | 11 ++--
 paimon_python_api/table.py                        |  5 ++
 paimon_python_api/{table.py => table_commit.py}   | 14 ++++--
 paimon_python_api/{table.py => table_write.py}    | 16 ++++--
 paimon_python_api/{table.py => write_builder.py}  | 23 +++++++--
 8 files changed, 115 insertions(+), 30 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 13c5c26..1e85b1b 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -35,6 +35,8 @@ jobs:
     runs-on: ubuntu-latest
 
     steps:
+      - name: Checkout code
+        uses: actions/checkout@v2
       - name: Run lint-python.sh
         run: |
           chmod +x dev/lint-python.sh
diff --git a/dev/lint-python.sh b/dev/lint-python.sh
index 4f7f89d..a287853 100755
--- a/dev/lint-python.sh
+++ b/dev/lint-python.sh
@@ -149,9 +149,9 @@ function get_os_index() {
     local sys_os=$(uname -s)
     echo "Detected OS: ${sys_os}"
     if [ ${sys_os} == "Darwin" ]; then
-        return 0
+        echo 0
     elif [[ ${sys_os} == "Linux" ]]; then
-        return 1
+        echo 1
     else
         echo "Unsupported OS: ${sys_os}"
         exit 1
@@ -360,8 +360,13 @@ function install_environment() {
     print_function "STAGE" "installing environment"
 
     #get the index of the SUPPORT_OS array for convenient to install tool.
-    get_os_index $sys_os
-    local os_index=$?
+    local os_index=$(get_os_index | tail -n1)
+
+    # In some Linux distributions, md5sum is installed instead of md5. But our 
miniconda installation shell uses md5
+    if [ "$os_index" -eq 1 ] && [ ! -f /usr/local/bin/md5 ]; then
+       echo "Creating symlink for md5 to md5sum..."
+       sudo ln -s $(which md5sum) /usr/local/bin/md5
+    fi
 
     # step-1 install wget
     # the file size of the miniconda.sh is too big to use "wget" tool to 
download instead
diff --git a/java_based_implementation/api_impl.py 
b/java_based_implementation/api_impl.py
index f0b35f1..8f8e87f 100644
--- a/java_based_implementation/api_impl.py
+++ b/java_based_implementation/api_impl.py
@@ -18,9 +18,9 @@
 
 from java_based_implementation.java_gateway import get_gateway
 from java_based_implementation.util.java_utils import to_j_catalog_context
-from paimon_python_api import catalog, read_builder, table_scan, split, 
table_read
-from paimon_python_api import table
-from pyarrow import RecordBatchReader
+from paimon_python_api import (catalog, table, read_builder, table_scan, 
split, table_read,
+                               write_builder, table_write, commit_message, 
table_commit)
+from pyarrow import RecordBatchReader, RecordBatch
 from typing import List
 from typing_extensions import Self
 
@@ -53,6 +53,10 @@ class Table(table.Table):
         j_read_builder = self._j_table.newReadBuilder()
         return ReadBuilder(j_read_builder)
 
+    def new_batch_write_builder(self) -> 'BatchWriteBuilder':
+        j_batch_write_builder = self._j_table.newBatchWriteBuilder()
+        return BatchWriteBuilder(j_batch_write_builder)
+
 
 class ReadBuilder(read_builder.ReadBuilder):
 
@@ -110,3 +114,54 @@ class TableRead(table_read.TableRead):
     def create_reader(self, split: Split) -> RecordBatchReader:
         # TODO
         pass
+
+
+class BatchWriteBuilder(write_builder.BatchWriteBuilder):
+
+    def __init__(self, j_batch_write_builder):
+        self._j_batch_write_builder = j_batch_write_builder
+
+    def with_overwrite(self, static_partition: dict) -> Self:
+        self._j_batch_write_builder.withOverwrite(static_partition)
+        return self
+
+    def new_write(self) -> 'BatchTableWrite':
+        j_batch_table_write = self._j_batch_write_builder.newWrite()
+        return BatchTableWrite(j_batch_table_write)
+
+    def new_commit(self) -> 'BatchTableCommit':
+        j_batch_table_commit = self._j_batch_write_builder.newCommit()
+        return BatchTableCommit(j_batch_table_commit)
+
+
+class BatchTableWrite(table_write.BatchTableWrite):
+
+    def __init__(self, j_batch_table_write):
+        self._j_batch_table_write = j_batch_table_write
+
+    def write(self, record_batch: RecordBatch):
+        # TODO
+        pass
+
+    def prepare_commit(self) -> List['CommitMessage']:
+        j_commit_messages = self._j_batch_table_write.prepareCommit()
+        return list(map(lambda cm: CommitMessage(cm), j_commit_messages))
+
+
+class CommitMessage(commit_message.CommitMessage):
+
+    def __init__(self, j_commit_message):
+        self._j_commit_message = j_commit_message
+
+    def to_j_commit_message(self):
+        return self._j_commit_message
+
+
+class BatchTableCommit(table_commit.BatchTableCommit):
+
+    def __init__(self, j_batch_table_commit):
+        self._j_batch_table_commit = j_batch_table_commit
+
+    def commit(self, commit_messages: List[CommitMessage]):
+        j_commit_messages = list(map(lambda cm: cm.to_j_commit_message(), 
commit_messages))
+        self._j_batch_table_commit.commit(j_commit_messages)
diff --git a/paimon_python_api/table.py b/paimon_python_api/commit_message.py
similarity index 76%
copy from paimon_python_api/table.py
copy to paimon_python_api/commit_message.py
index 11d1bde..6f2534e 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/commit_message.py
@@ -16,13 +16,8 @@
 # limitations under the License.
 
#################################################################################
 
-from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from abc import ABC
 
 
-class Table(ABC):
-    """A table provides basic abstraction for table read and write."""
-
-    @abstractmethod
-    def new_read_builder(self) -> ReadBuilder:
-        """Return a builder for building table scan and table read."""
+class CommitMessage(ABC):
+    """Commit message collected from writer."""
diff --git a/paimon_python_api/table.py b/paimon_python_api/table.py
index 11d1bde..f325cf6 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/table.py
@@ -18,6 +18,7 @@
 
 from abc import ABC, abstractmethod
 from read_builder import ReadBuilder
+from write_builder import BatchWriteBuilder
 
 
 class Table(ABC):
@@ -26,3 +27,7 @@ class Table(ABC):
     @abstractmethod
     def new_read_builder(self) -> ReadBuilder:
         """Return a builder for building table scan and table read."""
+
+    @abstractmethod
+    def new_batch_write_builder(self) -> BatchWriteBuilder:
+        """Returns a builder for building batch table write and table 
commit."""
diff --git a/paimon_python_api/table.py b/paimon_python_api/table_commit.py
similarity index 70%
copy from paimon_python_api/table.py
copy to paimon_python_api/table_commit.py
index 11d1bde..bdbbe3b 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/table_commit.py
@@ -17,12 +17,16 @@
 
#################################################################################
 
 from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from commit_message import CommitMessage
+from typing import List
 
 
-class Table(ABC):
-    """A table provides basic abstraction for table read and write."""
+class BatchTableCommit(ABC):
+    """A table commit for batch processing. Recommended for one-time 
committing."""
 
     @abstractmethod
-    def new_read_builder(self) -> ReadBuilder:
-        """Return a builder for building table scan and table read."""
+    def commit(self, commit_messages: List[CommitMessage]):
+        """
+        Commit the commit messages to generate snapshots. One commit may 
generate
+        up to two snapshots, one for adding new files and the other for 
compaction.
+        """
diff --git a/paimon_python_api/table.py b/paimon_python_api/table_write.py
similarity index 68%
copy from paimon_python_api/table.py
copy to paimon_python_api/table_write.py
index 11d1bde..b6ae4ce 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/table_write.py
@@ -17,12 +17,18 @@
 
#################################################################################
 
 from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from commit_message import CommitMessage
+from pyarrow import RecordBatch
+from typing import List
 
 
-class Table(ABC):
-    """A table provides basic abstraction for table read and write."""
+class BatchTableWrite(ABC):
+    """A table write for batch processing. Recommended for one-time 
committing."""
 
     @abstractmethod
-    def new_read_builder(self) -> ReadBuilder:
-        """Return a builder for building table scan and table read."""
+    def write(self, record_batch: RecordBatch):
+        """ Write a batch to the writer. */"""
+
+    @abstractmethod
+    def prepare_commit(self) -> List[CommitMessage]:
+        """Prepare commit message for TableCommit. Collect incremental files 
for this writer."""
diff --git a/paimon_python_api/table.py b/paimon_python_api/write_builder.py
similarity index 58%
copy from paimon_python_api/table.py
copy to paimon_python_api/write_builder.py
index 11d1bde..1757e27 100644
--- a/paimon_python_api/table.py
+++ b/paimon_python_api/write_builder.py
@@ -17,12 +17,25 @@
 
#################################################################################
 
 from abc import ABC, abstractmethod
-from read_builder import ReadBuilder
+from table_commit import BatchTableCommit
+from table_write import BatchTableWrite
+from typing_extensions import Self
 
 
-class Table(ABC):
-    """A table provides basic abstraction for table read and write."""
+class BatchWriteBuilder(ABC):
+    """An interface for building the TableScan and TableRead."""
 
     @abstractmethod
-    def new_read_builder(self) -> ReadBuilder:
-        """Return a builder for building table scan and table read."""
+    def with_overwrite(self, static_partition: dict) -> Self:
+        """
+        Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' 
semantics of SQL.
+        If you pass an empty dict, it means OVERWRITE whole table.
+        """
+
+    @abstractmethod
+    def new_write(self) -> BatchTableWrite:
+        """Create a BatchTableWrite to perform batch writing."""
+
+    @abstractmethod
+    def new_commit(self) -> BatchTableCommit:
+        """Create a BatchTableCommit to perform batch commiting."""

Reply via email to