This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ec5e81a6a [python] Support table write for PyPaimon (#6001)
4ec5e81a6a is described below

commit 4ec5e81a6aa967b8e496c9d6a7bae8b0c70c73b7
Author: ChengHui Chen <27797326+chenghuic...@users.noreply.github.com>
AuthorDate: Fri Aug 1 10:08:11 2025 +0800

    [python] Support table write for PyPaimon (#6001)
---
 .github/workflows/paimon-python-checks.yml         |   2 +-
 paimon-python/pypaimon/api/__init__.py             |   2 +-
 paimon-python/pypaimon/api/auth.py                 |   2 +-
 paimon-python/pypaimon/api/token_loader.py         |   4 +-
 paimon-python/pypaimon/catalog/catalog_utils.py    |   2 +-
 .../pypaimon/catalog/filesystem_catalog.py         |   5 +-
 .../pypaimon/catalog/rest/rest_catalog.py          |   2 +-
 paimon-python/pypaimon/{api => common}/config.py   |   9 +
 .../pypaimon/{api => common}/core_options.py       |   2 +
 paimon-python/pypaimon/common/file_io.py           | 311 +++++++++++++++++++--
 .../pypaimon/manifest/manifest_file_manager.py     |   3 +-
 .../pypaimon/manifest/manifest_list_manager.py     |   7 +-
 paimon-python/pypaimon/pvfs/__init__.py            |   2 +-
 paimon-python/pypaimon/read/read_builder.py        |   7 +-
 paimon-python/pypaimon/read/table_scan.py          |   7 +-
 paimon-python/pypaimon/schema/table_schema.py      |   2 +-
 .../pypaimon/snapshot/snapshot_manager.py          |   7 +-
 paimon-python/pypaimon/table/file_store_table.py   |  21 +-
 paimon-python/pypaimon/table/table.py              |  13 +-
 .../pypaimon/tests/filesystem_catalog_test.py      |  83 ++++++
 paimon-python/pypaimon/tests/writer_test.py        |  78 ++++++
 paimon-python/pypaimon/write/batch_table_commit.py |  72 +++++
 paimon-python/pypaimon/write/batch_table_write.py  |  63 +++++
 .../pypaimon/write/batch_write_builder.py          |  51 ++++
 .../core_options.py => write/commit_message.py}    |  55 ++--
 paimon-python/pypaimon/write/file_store_commit.py  | 120 ++++++++
 paimon-python/pypaimon/write/file_store_write.py   |  75 +++++
 paimon-python/pypaimon/write/row_key_extractor.py  | 102 +++++++
 paimon-python/pypaimon/write/writer/data_writer.py |   9 +-
 .../pypaimon/write/writer/key_value_data_writer.py |  10 +-
 paimon-python/setup.py                             |   1 +
 31 files changed, 1039 insertions(+), 90 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 2a3a22e1f6..de6c35dac4 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -45,7 +45,7 @@ jobs:
           python-version: ${{ env.PYTHON_VERSION }}
       - name: Install dependencies
         run: |
-          python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 pyarrow==15.0.2 numpy==1.24.3 
pandas==2.0.3 flake8==4.0.1 pytest~=7.0 requests 2>&1 >/dev/null
+          python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==15.0.2 
numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 pytest~=7.0 requests 2>&1 >/dev/null
       - name: Run lint-python.sh
         run: |
           chmod +x paimon-python/dev/lint-python.sh
diff --git a/paimon-python/pypaimon/api/__init__.py 
b/paimon-python/pypaimon/api/__init__.py
index 509c4d91a2..4c99ff6ed0 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -31,7 +31,7 @@ from pypaimon.api.api_response import (
 )
 from pypaimon.api.api_resquest import CreateDatabaseRequest, 
AlterDatabaseRequest, RenameTableRequest, \
     CreateTableRequest
-from pypaimon.api.config import CatalogOptions
+from pypaimon.common.config import CatalogOptions
 from pypaimon.api.client import HttpClient
 from pypaimon.common.identifier import Identifier
 from pypaimon.api.typedef import T
diff --git a/paimon-python/pypaimon/api/auth.py 
b/paimon-python/pypaimon/api/auth.py
index 393fa15920..eef71e3664 100644
--- a/paimon-python/pypaimon/api/auth.py
+++ b/paimon-python/pypaimon/api/auth.py
@@ -27,7 +27,7 @@ from typing import Optional, Dict
 
 from .token_loader import DLFTokenLoader, DLFToken
 from .typedef import RESTAuthParameter
-from .config import CatalogOptions
+from pypaimon.common.config import CatalogOptions
 
 
 class AuthProvider(ABC):
diff --git a/paimon-python/pypaimon/api/token_loader.py 
b/paimon-python/pypaimon/api/token_loader.py
index 9214345fce..bfc1ef173e 100644
--- a/paimon-python/pypaimon/api/token_loader.py
+++ b/paimon-python/pypaimon/api/token_loader.py
@@ -25,7 +25,7 @@ from requests.adapters import HTTPAdapter
 from requests.exceptions import RequestException
 
 from pypaimon.common.rest_json import json_field, JSON
-from .config import CatalogOptions
+from pypaimon.common.config import CatalogOptions
 from .client import ExponentialRetry
 
 
@@ -59,7 +59,7 @@ class DLFToken:
 
     @classmethod
     def from_options(cls, options: Dict[str, str]) -> Optional['DLFToken']:
-        from .config import CatalogOptions
+        from pypaimon.common.config import CatalogOptions
         if (options.get(CatalogOptions.DLF_ACCESS_KEY_ID) is None
                 or options.get(CatalogOptions.DLF_ACCESS_KEY_SECRET) is None):
             return None
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py 
b/paimon-python/pypaimon/catalog/catalog_utils.py
index 2bae27aa4e..9b030b9f02 100644
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ b/paimon-python/pypaimon/catalog/catalog_utils.py
@@ -18,7 +18,7 @@ limitations under the License.
 from pathlib import Path
 from typing import Callable, Any
 
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.identifier import Identifier
 
 from pypaimon.catalog.table_metadata import TableMetadata
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 6651eec317..fa31a93c53 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -22,9 +22,10 @@ from urllib.parse import urlparse
 
 from pypaimon import Catalog, Database, Table
 from pypaimon.api import CatalogOptions, Identifier
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.catalog.catalog_exception import TableNotExistException, 
DatabaseNotExistException, \
     TableAlreadyExistException, DatabaseAlreadyExistException
+from pypaimon.common.file_io import FileIO
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.table.file_store_table import FileStoreTable
 
@@ -35,7 +36,7 @@ class FileSystemCatalog(Catalog):
             raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE}' path must 
be set")
         self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
         self.catalog_options = catalog_options
-        self.file_io = None  # FileIO(self.warehouse, self.catalog_options)
+        self.file_io = FileIO(self.warehouse, self.catalog_options)
 
     def get_database(self, name: str) -> Database:
         if self.file_io.exists(self.get_database_path(name)):
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index ab08685840..540beb6353 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -21,7 +21,7 @@ from typing import List, Dict, Optional, Union
 from pypaimon import Database, Catalog, Schema
 from pypaimon.api import RESTApi, CatalogOptions
 from pypaimon.api.api_response import PagedList, GetTableResponse
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.identifier import Identifier
 from pypaimon.api.options import Options
 
diff --git a/paimon-python/pypaimon/api/config.py 
b/paimon-python/pypaimon/common/config.py
similarity index 87%
rename from paimon-python/pypaimon/api/config.py
rename to paimon-python/pypaimon/common/config.py
index 2a33182993..4bfcc418e7 100644
--- a/paimon-python/pypaimon/api/config.py
+++ b/paimon-python/pypaimon/common/config.py
@@ -20,6 +20,15 @@ class OssOptions:
     OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"
     OSS_SECURITY_TOKEN = "fs.oss.securityToken"
     OSS_ENDPOINT = "fs.oss.endpoint"
+    OSS_REGION = "fs.oss.region"
+
+
+class S3Options:
+    S3_ACCESS_KEY_ID = "fs.s3.accessKeyId"
+    S3_ACCESS_KEY_SECRET = "fs.s3.accessKeySecret"
+    S3_SECURITY_TOKEN = "fs.s3.securityToken"
+    S3_ENDPOINT = "fs.s3.endpoint"
+    S3_REGION = "fs.s3.region"
 
 
 class CatalogOptions:
diff --git a/paimon-python/pypaimon/api/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
similarity index 96%
copy from paimon-python/pypaimon/api/core_options.py
copy to paimon-python/pypaimon/common/core_options.py
index 18d2fba291..1aa4fc065f 100644
--- a/paimon-python/pypaimon/api/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -44,3 +44,5 @@ class CoreOptions(str, Enum):
     FILE_BLOCK_SIZE = "file.block-size"
     # Scan options
     SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
+    # commit options
+    COMMIT_USER_PREFIX = "commit.user-prefix"
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index b58a936af4..c2d579eb53 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -15,43 +15,302 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
+
+import logging
+import os
+import subprocess
 from pathlib import Path
+from typing import Optional, Dict, Any, List
+from urllib.parse import urlparse, splitport
 
+import pyarrow
+from pyarrow._fs import FileSystem
 
-class FileIO(ABC):
-    @abstractmethod
-    def exists(self, path: Path) -> bool:
-        """"""
+from pypaimon.common.config import OssOptions, S3Options
 
-    @abstractmethod
-    def read_file_utf8(self, path: Path) -> str:
-        """"""
 
-    @abstractmethod
-    def try_to_write_atomic(self, path: Path, content: str) -> bool:
-        """"""
+class FileIO:
+    def __init__(self, warehouse: str, catalog_options: dict):
+        self.properties = catalog_options
+        self.logger = logging.getLogger(__name__)
+        scheme, netloc, path = self.parse_location(warehouse)
+        if scheme in {"oss"}:
+            self.filesystem = self._initialize_oss_fs()
+        elif scheme in {"s3", "s3a", "s3n"}:
+            self.filesystem = self._initialize_s3_fs()
+        elif scheme in {"hdfs", "viewfs"}:
+            self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
+        elif scheme in {"file"}:
+            self.filesystem = self._initialize_local_fs()
+        else:
+            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
+
+    @staticmethod
+    def parse_location(location: str):
+        uri = urlparse(location)
+        if not uri.scheme:
+            return "file", uri.netloc, os.path.abspath(location)
+        elif uri.scheme in ("hdfs", "viewfs"):
+            return uri.scheme, uri.netloc, uri.path
+        else:
+            return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
+
+    def _initialize_oss_fs(self) -> FileSystem:
+        from pyarrow.fs import S3FileSystem
+
+        client_kwargs = {
+            "endpoint_override": self.properties.get(OssOptions.OSS_ENDPOINT),
+            "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
+            "secret_key": 
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
+            "session_token": 
self.properties.get(OssOptions.OSS_SECURITY_TOKEN),
+            "region": self.properties.get(OssOptions.OSS_REGION),
+            "force_virtual_addressing": True,
+        }
+
+        return S3FileSystem(**client_kwargs)
+
+    def _initialize_s3_fs(self) -> FileSystem:
+        from pyarrow.fs import S3FileSystem
+
+        client_kwargs = {
+            "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
+            "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
+            "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET),
+            "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN),
+            "region": self.properties.get(S3Options.S3_REGION),
+            "force_virtual_addressing": True,
+        }
+
+        return S3FileSystem(**client_kwargs)
+
+    def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:
+        from pyarrow.fs import HadoopFileSystem
+
+        if 'HADOOP_HOME' not in os.environ:
+            raise RuntimeError("HADOOP_HOME environment variable is not set.")
+        if 'HADOOP_CONF_DIR' not in os.environ:
+            raise RuntimeError("HADOOP_CONF_DIR environment variable is not 
set.")
+
+        hadoop_home = os.environ.get("HADOOP_HOME")
+        native_lib_path = f"{hadoop_home}/lib/native"
+        os.environ['LD_LIBRARY_PATH'] = 
f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}"
+
+        class_paths = subprocess.run(
+            [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'],
+            capture_output=True,
+            text=True,
+            check=True
+        )
+        os.environ['CLASSPATH'] = class_paths.stdout.strip()
+
+        host, port_str = splitport(netloc)
+        return HadoopFileSystem(
+            host=host,
+            port=int(port_str),
+            user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
+        )
+
+    def _initialize_local_fs(self) -> FileSystem:
+        from pyarrow.fs import LocalFileSystem
+
+        return LocalFileSystem()
+
+    def new_input_stream(self, path: Path):
+        return self.filesystem.open_input_file(str(path))
+
+    def new_output_stream(self, path: Path):
+        parent_dir = path.parent
+        if str(parent_dir) and not self.exists(parent_dir):
+            self.mkdirs(parent_dir)
+
+        return self.filesystem.open_output_stream(str(path))
+
+    def get_file_status(self, path: Path):
+        file_infos = self.filesystem.get_file_info([str(path)])
+        return file_infos[0]
 
-    @abstractmethod
     def list_status(self, path: Path):
-        """"""
+        selector = pyarrow.fs.FileSelector(str(path), recursive=False, 
allow_not_found=True)
+        return self.filesystem.get_file_info(selector)
+
+    def list_directories(self, path: Path):
+        file_infos = self.list_status(path)
+        return [info for info in file_infos if info.type == 
pyarrow.fs.FileType.Directory]
+
+    def exists(self, path: Path) -> bool:
+        try:
+            file_info = self.filesystem.get_file_info([str(path)])[0]
+            return file_info.type != pyarrow.fs.FileType.NotFound
+        except Exception:
+            return False
+
+    def delete(self, path: Path, recursive: bool = False) -> bool:
+        try:
+            file_info = self.filesystem.get_file_info([str(path)])[0]
+            if file_info.type == pyarrow.fs.FileType.Directory:
+                if recursive:
+                    self.filesystem.delete_dir_contents(str(path))
+                else:
+                    self.filesystem.delete_dir(str(path))
+            else:
+                self.filesystem.delete_file(str(path))
+            return True
+        except Exception as e:
+            self.logger.warning(f"Failed to delete {path}: {e}")
+            return False
 
-    @abstractmethod
     def mkdirs(self, path: Path) -> bool:
-        """"""
+        try:
+            self.filesystem.create_dir(str(path), recursive=True)
+            return True
+        except Exception as e:
+            self.logger.warning(f"Failed to create directory {path}: {e}")
+            return False
 
-    @abstractmethod
-    def write_file(self, path: Path, content: str, overwrite: bool = False):
-        """"""
+    def rename(self, src: Path, dst: Path) -> bool:
+        try:
+            dst_parent = dst.parent
+            if str(dst_parent) and not self.exists(dst_parent):
+                self.mkdirs(dst_parent)
+
+            self.filesystem.move(str(src), str(dst))
+            return True
+        except Exception as e:
+            self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+            return False
 
-    @abstractmethod
     def delete_quietly(self, path: Path):
-        """"""
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Ready to delete {path}")
 
-    @abstractmethod
-    def new_input_stream(self, path: Path):
-        """"""
+        try:
+            if not self.delete(path, False) and self.exists(path):
+                self.logger.warning(f"Failed to delete file {path}")
+        except Exception:
+            self.logger.warning(f"Exception occurs when deleting file {path}", 
exc_info=True)
+
+    def delete_files_quietly(self, files: List[Path]):
+        for file_path in files:
+            self.delete_quietly(file_path)
+
+    def delete_directory_quietly(self, directory: Path):
+        if self.logger.isEnabledFor(logging.DEBUG):
+            self.logger.debug(f"Ready to delete {directory}")
+
+        try:
+            if not self.delete(directory, True) and self.exists(directory):
+                self.logger.warning(f"Failed to delete directory {directory}")
+        except Exception:
+            self.logger.warning(f"Exception occurs when deleting directory 
{directory}", exc_info=True)
+
+    def get_file_size(self, path: Path) -> int:
+        file_info = self.get_file_status(path)
+        if file_info.size is None:
+            raise ValueError(f"File size not available for {path}")
+        return file_info.size
+
+    def is_dir(self, path: Path) -> bool:
+        file_info = self.get_file_status(path)
+        return file_info.type == pyarrow.fs.FileType.Directory
+
+    def check_or_mkdirs(self, path: Path):
+        if self.exists(path):
+            if not self.is_dir(path):
+                raise ValueError(f"The path '{path}' should be a directory.")
+        else:
+            self.mkdirs(path)
+
+    def read_file_utf8(self, path: Path) -> str:
+        with self.new_input_stream(path) as input_stream:
+            return input_stream.read().decode('utf-8')
+
+    def try_to_write_atomic(self, path: Path, content: str) -> bool:
+        temp_path = path.with_suffix(path.suffix + ".tmp") if path.suffix else 
Path(str(path) + ".tmp")
+        success = False
+        try:
+            self.write_file(temp_path, content, False)
+            success = self.rename(temp_path, path)
+        finally:
+            if not success:
+                self.delete_quietly(temp_path)
+            return success
+
+    def write_file(self, path: Path, content: str, overwrite: bool = False):
+        with self.new_output_stream(path) as output_stream:
+            output_stream.write(content.encode('utf-8'))
+
+    def overwrite_file_utf8(self, path: Path, content: str):
+        with self.new_output_stream(path) as output_stream:
+            output_stream.write(content.encode('utf-8'))
+
+    def copy_file(self, source_path: Path, target_path: Path, overwrite: bool 
= False):
+        if not overwrite and self.exists(target_path):
+            raise FileExistsError(f"Target file {target_path} already exists 
and overwrite=False")
+
+        self.filesystem.copy_file(str(source_path), str(target_path))
+
+    def copy_files(self, source_directory: Path, target_directory: Path, 
overwrite: bool = False):
+        file_infos = self.list_status(source_directory)
+        for file_info in file_infos:
+            if file_info.type == pyarrow.fs.FileType.File:
+                source_file = Path(file_info.path)
+                target_file = target_directory / source_file.name
+                self.copy_file(source_file, target_file, overwrite)
+
+    def read_overwritten_file_utf8(self, path: Path) -> Optional[str]:
+        retry_number = 0
+        exception = None
+        while retry_number < 5:
+            try:
+                return self.read_file_utf8(path)
+            except FileNotFoundError:
+                return None
+            except Exception as e:
+                if not self.exists(path):
+                    return None
+
+                if 
(str(type(e).__name__).endswith("RemoteFileChangedException") or
+                        (str(e) and "Blocklist for" in str(e) and "has 
changed" in str(e))):
+                    exception = e
+                    retry_number += 1
+                else:
+                    raise e
+
+        if exception:
+            if isinstance(exception, Exception):
+                raise exception
+            else:
+                raise RuntimeError(exception)
+
+        return None
+
+    def write_parquet(self, path: Path, data: pyarrow.RecordBatch, 
compression: str = 'snappy', **kwargs):
+        try:
+            import pyarrow.parquet as pq
+
+            with self.new_output_stream(path) as output_stream:
+                with pq.ParquetWriter(output_stream, data.schema, 
compression=compression, **kwargs) as pw:
+                    pw.write_batch(data)
+
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write Parquet file {path}: {e}") 
from e
+
+    def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: 
str = 'zstd', **kwargs):
+        try:
+            import pyarrow.orc as orc
+
+            with self.new_output_stream(path) as output_stream:
+                orc.write_table(
+                    data,
+                    output_stream,
+                    compression=compression,
+                    **kwargs
+                )
+
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
 
-    @abstractmethod
-    def get_file_size(self, path: Path):
-        """"""
+    def write_avro(self, path: Path, table: pyarrow.RecordBatch, schema: 
Optional[Dict[str, Any]] = None, **kwargs):
+        raise ValueError("Unsupported write_avro yet")
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index a70d0e2d46..6775d24bcd 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -23,7 +23,6 @@ from io import BytesIO
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry, 
MANIFEST_ENTRY_SCHEMA
-from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.table.row.binary_row import BinaryRowDeserializer, 
BinaryRowSerializer, BinaryRow
 
 
@@ -31,6 +30,8 @@ class ManifestFileManager:
     """Writer for manifest files in Avro format using unified FileIO."""
 
     def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
         self.table: FileStoreTable = table
         self.manifest_path = table.table_path / "manifest"
         self.file_io = table.file_io
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 969c6a05ef..1c58ea5b6a 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -24,14 +24,15 @@ from io import BytesIO
 
 from pypaimon.manifest.schema.manifest_file_meta import 
MANIFEST_FILE_META_SCHEMA
 from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.table.file_store_table import FileStoreTable
 
 
 class ManifestListManager:
     """Manager for manifest list files in Avro format using unified FileIO."""
 
-    def __init__(self, table: FileStoreTable):
-        self.table = table
+    def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
         self.manifest_path = self.table.table_path / "manifest"
         self.file_io = self.table.file_io
 
diff --git a/paimon-python/pypaimon/pvfs/__init__.py 
b/paimon-python/pypaimon/pvfs/__init__.py
index 2b425d0251..ec1cb0f05c 100644
--- a/paimon-python/pypaimon/pvfs/__init__.py
+++ b/paimon-python/pypaimon/pvfs/__init__.py
@@ -33,7 +33,7 @@ from fsspec.implementations.local import LocalFileSystem
 
 from pypaimon.api import RESTApi, GetTableTokenResponse, Schema, Identifier, 
GetTableResponse
 from pypaimon.api.client import NoSuchResourceException, AlreadyExistsException
-from pypaimon.api.config import CatalogOptions, OssOptions, PVFSOptions
+from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions
 
 PROTOCOL_NAME = "pvfs"
 
diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
index 604eff9ff1..30f824a698 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -23,14 +23,15 @@ from pypaimon.common.predicate_builder import 
PredicateBuilder
 from pypaimon.read.table_read import TableRead
 from pypaimon.read.table_scan import TableScan
 from pypaimon.schema.data_types import DataField
-from pypaimon.table.file_store_table import FileStoreTable
 
 
 class ReadBuilder:
     """Implementation of ReadBuilder for native Python reading."""
 
-    def __init__(self, table: FileStoreTable):
-        self.table = table
+    def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
         self._predicate: Optional[Predicate] = None
         self._projection: Optional[List[str]] = None
         self._limit: Optional[int] = None
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index fb56d70a82..27015d3eda 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -29,15 +29,16 @@ from pypaimon.read.plan import Plan
 from pypaimon.read.split import Split
 from pypaimon.schema.data_types import DataField
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
-from pypaimon.table.file_store_table import FileStoreTable
 
 
 class TableScan:
     """Implementation of TableScan for native Python reading."""
 
-    def __init__(self, table: FileStoreTable, predicate: Optional[Predicate], 
limit: Optional[int],
+    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int],
                  read_type: List[DataField]):
-        self.table = table
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
         self.predicate = predicate
         self.predicate = predicate
         self.limit = limit
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index 635cb3464d..736de902db 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -27,7 +27,7 @@ import pyarrow
 from pypaimon import Schema
 from pypaimon.common.rest_json import json_field
 from pypaimon.schema import data_types
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.schema.data_types import DataField
 
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 9e500c48d2..6a8a68e73a 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -21,14 +21,15 @@ from typing import Optional
 
 from pypaimon.common.file_io import FileIO
 from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.table.file_store_table import FileStoreTable
 
 
 class SnapshotManager:
     """Manager for snapshot files using unified FileIO."""
 
-    def __init__(self, table: FileStoreTable):
-        self.table = table
+    def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
         self.file_io: FileIO = self.table.file_io
         self.snapshot_dir = self.table.table_path / "snapshot"
 
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index e1b8322fe8..225edd1cc4 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -19,12 +19,14 @@
 from pathlib import Path
 
 from pypaimon import Table
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.common.file_io import FileIO
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.table.bucket_mode import BucketMode
+from pypaimon.write.batch_write_builder import BatchWriteBuilder
+from pypaimon.write.row_key_extractor import RowKeyExtractor, 
FixedBucketRowKeyExtractor, UnawareBucketRowKeyExtractor
 
 
 class FileStoreTable(Table):
@@ -56,3 +58,20 @@ class FileStoreTable(Table):
                 return BucketMode.BUCKET_UNAWARE
             else:
                 return BucketMode.HASH_FIXED
+
+    def new_read_builder(self) -> 'ReadBuilder':
+        pass
+
+    def new_batch_write_builder(self) -> BatchWriteBuilder:
+        return BatchWriteBuilder(self)
+
+    def create_row_key_extractor(self) -> RowKeyExtractor:
+        bucket_mode = self.bucket_mode()
+        if bucket_mode == BucketMode.HASH_FIXED:
+            return FixedBucketRowKeyExtractor(self.table_schema)
+        elif bucket_mode == BucketMode.BUCKET_UNAWARE:
+            return UnawareBucketRowKeyExtractor(self.table_schema)
+        elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode == 
BucketMode.CROSS_PARTITION:
+            raise ValueError(f"Unsupported bucket mode {bucket_mode} yet")
+        else:
+            raise ValueError(f"Unsupported bucket mode: {bucket_mode}")
diff --git a/paimon-python/pypaimon/table/table.py 
b/paimon-python/pypaimon/table/table.py
index 6f15b049ae..3a1fe2e622 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -16,8 +16,19 @@
 # limitations under the License.
 
#################################################################################
 
-from abc import ABC
+from abc import ABC, abstractmethod
+
+from pypaimon.read.read_builder import ReadBuilder
+from pypaimon.write.batch_write_builder import BatchWriteBuilder
 
 
 class Table(ABC):
     """A table provides basic abstraction for table read and write."""
+
+    @abstractmethod
+    def new_read_builder(self) -> ReadBuilder:
+        """Return a builder for building table scan and table read."""
+
+    @abstractmethod
+    def new_batch_write_builder(self) -> BatchWriteBuilder:
+        """Returns a builder for building batch table write and table 
commit."""
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py 
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
new file mode 100644
index 0000000000..5b0dc81b2f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import shutil
+import tempfile
+import unittest
+
+from pypaimon import Schema
+from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException, 
TableAlreadyExistException, \
+    DatabaseNotExistException, TableNotExistException
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class FileSystemCatalogTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+        self.warehouse = os.path.join(self.temp_dir, 'test_dir')
+
+    def tearDown(self):
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+    def test_database(self):
+        catalog = CatalogFactory.create({
+            "warehouse": self.warehouse
+        })
+        catalog.create_database("test_db", False)
+        self.assertTrue(os.path.exists(self.warehouse + "/test_db.db"))
+
+        with self.assertRaises(DatabaseAlreadyExistException):
+            catalog.create_database("test_db", False)
+
+        catalog.create_database("test_db", True)
+
+        with self.assertRaises(DatabaseNotExistException):
+            catalog.get_database("test_db_x")
+
+        database = catalog.get_database("test_db")
+        self.assertEqual(database.name, "test_db")
+
+    def test_table(self):
+        fields = [
+            DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+            DataField.from_dict({"id": 2, "name": "f1", "type": "INT"}),
+            DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}),
+        ]
+        catalog = CatalogFactory.create({
+            "warehouse": self.warehouse
+        })
+        catalog.create_database("test_db", False)
+        catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+        self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/schema/schema-0"))
+
+        with self.assertRaises(TableAlreadyExistException):
+            catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+
+        catalog.create_table("test_db.test_table", Schema(fields=fields), True)
+
+        database = catalog.get_database("test_db")
+        self.assertEqual(database.name, "test_db")
+
+        with self.assertRaises(TableNotExistException):
+            catalog.get_table("test_db.test_table_x")
+
+        table = catalog.get_table("test_db.test_table")
+        self.assertTrue(table is not None)
+        self.assertTrue(isinstance(table, FileStoreTable))
diff --git a/paimon-python/pypaimon/tests/writer_test.py 
b/paimon-python/pypaimon/tests/writer_test.py
new file mode 100644
index 0000000000..9a8230b0eb
--- /dev/null
+++ b/paimon-python/pypaimon/tests/writer_test.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import glob
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow
+
+from pypaimon import Schema
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.data_types import DataField
+
+
+class WriterTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+        self.warehouse = os.path.join(self.temp_dir, 'test_dir')
+
+    def tearDown(self):
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+    def test_writer(self):
+        pa_schema = pyarrow.schema([
+            ('f0', pyarrow.int32()),
+            ('f1', pyarrow.string()),
+            ('f2', pyarrow.string())
+        ])
+        fields = [
+            DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+            DataField.from_dict({"id": 2, "name": "f1", "type": "STRING"}),
+            DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}),
+        ]
+        catalog = CatalogFactory.create({
+            "warehouse": self.warehouse
+        })
+        catalog.create_database("test_db", False)
+        catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+        table = catalog.get_table("test_db.test_table")
+
+        data = {
+            'f0': [1, 2, 3],
+            'f1': ['a', 'b', 'c'],
+            'f2': ['X', 'Y', 'Z']
+        }
+        expect = pyarrow.Table.from_pydict(data, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(expect)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/snapshot/LATEST"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/snapshot/snapshot-1"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/manifest"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/bucket-0"))
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/bucket-0/*.parquet")), 1)
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py 
b/paimon-python/pypaimon/write/batch_table_commit.py
new file mode 100644
index 0000000000..14849e6309
--- /dev/null
+++ b/paimon-python/pypaimon/write/batch_table_commit.py
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import time
+from typing import List, Optional
+
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.file_store_commit import FileStoreCommit
+
+
+class BatchTableCommit:
+    """Python implementation of BatchTableCommit for batch writing 
scenarios."""
+
+    def __init__(self, table, commit_user: str, static_partition: 
Optional[dict]):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
+        self.commit_user = commit_user
+        self.overwrite_partition = static_partition
+        self.file_store_commit = FileStoreCommit(table, commit_user)
+        self.batch_committed = False
+
+    def commit(self, commit_messages: List[CommitMessage]):
+        self._check_committed()
+
+        non_empty_messages = [msg for msg in commit_messages if not 
msg.is_empty()]
+        if not non_empty_messages:
+            return
+
+        commit_identifier = int(time.time() * 1000)
+
+        try:
+            if self.overwrite_partition is not None:
+                self.file_store_commit.overwrite(
+                    partition=self.overwrite_partition,
+                    commit_messages=non_empty_messages,
+                    commit_identifier=commit_identifier
+                )
+            else:
+                self.file_store_commit.commit(
+                    commit_messages=non_empty_messages,
+                    commit_identifier=commit_identifier
+                )
+        except Exception as e:
+            self.file_store_commit.abort(commit_messages)
+            raise RuntimeError(f"Failed to commit: {str(e)}") from e
+
+    def abort(self, commit_messages: List[CommitMessage]):
+        self.file_store_commit.abort(commit_messages)
+
+    def close(self):
+        self.file_store_commit.close()
+
+    def _check_committed(self):
+        if self.batch_committed:
+            raise RuntimeError("BatchTableCommit only supports one-time 
committing.")
+        self.batch_committed = True
diff --git a/paimon-python/pypaimon/write/batch_table_write.py 
b/paimon-python/pypaimon/write/batch_table_write.py
new file mode 100644
index 0000000000..921de1151f
--- /dev/null
+++ b/paimon-python/pypaimon/write/batch_table_write.py
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from collections import defaultdict
+
+from typing import List
+
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.file_store_write import FileStoreWrite
+
+
+class BatchTableWrite:
+    def __init__(self, table):
+        self.file_store_write = FileStoreWrite(table)
+        self.row_key_extractor = table.create_row_key_extractor()
+        self.batch_committed = False
+
+    def write_arrow(self, table: pa.Table, row_kind: List[int] = None):
+        # TODO: support row_kind
+        batches_iterator = table.to_batches()
+        for batch in batches_iterator:
+            self.write_arrow_batch(batch)
+
+    def write_arrow_batch(self, data: pa.RecordBatch, row_kind: List[int] = 
None):
+        # TODO: support row_kind
+        partitions, buckets = 
self.row_key_extractor.extract_partition_bucket_batch(data)
+
+        partition_bucket_groups = defaultdict(list)
+        for i in range(data.num_rows):
+            partition_bucket_groups[(tuple(partitions[i]), 
buckets[i])].append(i)
+
+        for (partition, bucket), row_indices in 
partition_bucket_groups.items():
+            indices_array = pa.array(row_indices, type=pa.int64())
+            sub_table = pa.compute.take(data, indices_array)
+            self.file_store_write.write(partition, bucket, sub_table)
+
+    def write_pandas(self, dataframe):
+        raise ValueError("Not implemented yet")
+
+    def prepare_commit(self) -> List[CommitMessage]:
+        if self.batch_committed:
+            raise RuntimeError("BatchTableWrite only supports one-time 
committing.")
+        self.batch_committed = True
+        return self.file_store_write.prepare_commit()
+
+    def close(self):
+        self.file_store_write.close()
diff --git a/paimon-python/pypaimon/write/batch_write_builder.py 
b/paimon-python/pypaimon/write/batch_write_builder.py
new file mode 100644
index 0000000000..e23455f3fc
--- /dev/null
+++ b/paimon-python/pypaimon/write/batch_write_builder.py
@@ -0,0 +1,51 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+
+from typing import Optional
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.write.batch_table_commit import BatchTableCommit
+from pypaimon.write.batch_table_write import BatchTableWrite
+
+
+class BatchWriteBuilder:
+    def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
+        self.commit_user = self._create_commit_user()
+        self.static_partition = None
+
+    def overwrite(self, static_partition: Optional[dict] = None):
+        self.static_partition = static_partition
+        return self
+
+    def new_write(self) -> BatchTableWrite:
+        return BatchTableWrite(self.table)
+
+    def new_commit(self) -> BatchTableCommit:
+        commit = BatchTableCommit(self.table, self.commit_user, 
self.static_partition)
+        return commit
+
+    def _create_commit_user(self):
+        if CoreOptions.COMMIT_USER_PREFIX in self.table.options:
+            return 
f"{self.table.options.get(CoreOptions.COMMIT_USER_PREFIX)}_{uuid.uuid4()}"
+        else:
+            return str(uuid.uuid4())
diff --git a/paimon-python/pypaimon/api/core_options.py 
b/paimon-python/pypaimon/write/commit_message.py
similarity index 55%
rename from paimon-python/pypaimon/api/core_options.py
rename to paimon-python/pypaimon/write/commit_message.py
index 18d2fba291..fc36f13e5b 100644
--- a/paimon-python/pypaimon/api/core_options.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -16,31 +16,30 @@
 # limitations under the License.
 
################################################################################
 
-from enum import Enum
-
-
-class CoreOptions(str, Enum):
-    """Core options for paimon."""
-
-    def __str__(self):
-        return self.value
-
-    # Basic options
-    AUTO_CREATE = "auto-create"
-    PATH = "path"
-    TYPE = "type"
-    BRANCH = "branch"
-    BUCKET = "bucket"
-    BUCKET_KEY = "bucket-key"
-    WAREHOUSE = "warehouse"
-    # File format options
-    FILE_FORMAT = "file.format"
-    FILE_FORMAT_ORC = "orc"
-    FILE_FORMAT_AVRO = "avro"
-    FILE_FORMAT_PARQUET = "parquet"
-    FILE_COMPRESSION = "file.compression"
-    FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
-    FILE_FORMAT_PER_LEVEL = "file.format.per.level"
-    FILE_BLOCK_SIZE = "file.block-size"
-    # Scan options
-    SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
+from typing import Tuple, List
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+
+
+class CommitMessage:
+    """Python implementation of CommitMessage"""
+
+    def __init__(self, partition: Tuple, bucket: int, new_files: 
List[DataFileMeta]):
+        self._partition = partition
+        self._bucket = bucket
+        self._new_files = new_files or []
+
+    def partition(self) -> Tuple:
+        """Get the partition of this commit message."""
+        return self._partition
+
+    def bucket(self) -> int:
+        """Get the bucket of this commit message."""
+        return self._bucket
+
+    def new_files(self) -> List[DataFileMeta]:
+        """Get the list of new files."""
+        return self._new_files
+
+    def is_empty(self):
+        return not self._new_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
new file mode 100644
index 0000000000..4928167907
--- /dev/null
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -0,0 +1,120 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import time
+from pathlib import Path
+from typing import List
+
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.write.commit_message import CommitMessage
+
+
+class FileStoreCommit:
+    """Core commit logic for file store operations."""
+
+    def __init__(self, table, commit_user: str):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
+        self.commit_user = commit_user
+
+        self.snapshot_manager = SnapshotManager(table)
+        self.manifest_file_manager = ManifestFileManager(table)
+        self.manifest_list_manager = ManifestListManager(table)
+
+        self.manifest_target_size = 8 * 1024 * 1024
+        self.manifest_merge_min_count = 30
+
+    def commit(self, commit_messages: List[CommitMessage], commit_identifier: 
int):
+        """Commit the given commit messages in normal append mode."""
+        if not commit_messages:
+            return
+
+        new_manifest_files = self.manifest_file_manager.write(commit_messages)
+        if not new_manifest_files:
+            return
+        latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+        existing_manifest_files = []
+        if latest_snapshot:
+            existing_manifest_files = 
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+        new_manifest_files.extend(existing_manifest_files)
+        manifest_list = self.manifest_list_manager.write(new_manifest_files)
+
+        new_snapshot_id = self._generate_snapshot_id()
+        snapshot_data = Snapshot(
+            version=3,
+            id=new_snapshot_id,
+            schema_id=0,
+            base_manifest_list=manifest_list,
+            delta_manifest_list=manifest_list,
+            commit_user=self.commit_user,
+            commit_identifier=commit_identifier,
+            commit_kind="APPEND",
+            time_millis=int(time.time() * 1000),
+            log_offsets={},
+        )
+        self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+    def overwrite(self, partition, commit_messages: List[CommitMessage], 
commit_identifier: int):
+        if not commit_messages:
+            return
+
+        new_manifest_files = self.manifest_file_manager.write(commit_messages)
+        if not new_manifest_files:
+            return
+
+        # In overwrite mode, we don't merge with existing manifests
+        manifest_list = self.manifest_list_manager.write(new_manifest_files)
+
+        new_snapshot_id = self._generate_snapshot_id()
+        snapshot_data = Snapshot(
+            version=3,
+            id=new_snapshot_id,
+            schema_id=0,
+            base_manifest_list=manifest_list,
+            delta_manifest_list=manifest_list,
+            commit_user=self.commit_user,
+            commit_identifier=commit_identifier,
+            commit_kind="OVERWRITE",
+            time_millis=int(time.time() * 1000),
+            log_offsets={},
+        )
+        self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+    def abort(self, commit_messages: List[CommitMessage]):
+        for message in commit_messages:
+            for file in message.new_files():
+                try:
+                    file_path_obj = Path(file.file_path)
+                    if file_path_obj.exists():
+                        file_path_obj.unlink()
+                except Exception as e:
+                    print(f"Warning: Failed to clean up file {file.file_path}: 
{e}")
+
+    def close(self):
+        pass
+
+    def _generate_snapshot_id(self) -> int:
+        latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+        if latest_snapshot:
+            return latest_snapshot.id + 1
+        else:
+            return 1
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
new file mode 100644
index 0000000000..d1002bc094
--- /dev/null
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -0,0 +1,75 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from typing import Dict, Tuple, List
+
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+from pypaimon.write.writer.data_writer import DataWriter
+from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
+
+
+class FileStoreWrite:
+    """Base class for file store write operations."""
+
+    def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
+        self.data_writers: Dict[Tuple, DataWriter] = {}
+
+    def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
+        key = (partition, bucket)
+        if key not in self.data_writers:
+            self.data_writers[key] = self._create_data_writer(partition, 
bucket)
+        writer = self.data_writers[key]
+        writer.write(data)
+
+    def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
+        if self.table.is_primary_key_table:
+            return KeyValueDataWriter(
+                table=self.table,
+                partition=partition,
+                bucket=bucket,
+            )
+        else:
+            return AppendOnlyDataWriter(
+                table=self.table,
+                partition=partition,
+                bucket=bucket,
+            )
+
+    def prepare_commit(self) -> List[CommitMessage]:
+        commit_messages = []
+        for (partition, bucket), writer in self.data_writers.items():
+            committed_files = writer.prepare_commit()
+            if committed_files:
+                commit_message = CommitMessage(
+                    partition=partition,
+                    bucket=bucket,
+                    new_files=committed_files
+                )
+                commit_messages.append(commit_message)
+        return commit_messages
+
+    def close(self):
+        """Close all data writers and clean up resources."""
+        for writer in self.data_writers.values():
+            writer.close()
+        self.data_writers.clear()
diff --git a/paimon-python/pypaimon/write/row_key_extractor.py 
b/paimon-python/pypaimon/write/row_key_extractor.py
new file mode 100644
index 0000000000..cda3ad07ba
--- /dev/null
+++ b/paimon-python/pypaimon/write/row_key_extractor.py
@@ -0,0 +1,102 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from typing import Tuple, List
+from abc import ABC, abstractmethod
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.schema.table_schema import TableSchema
+
+
+class RowKeyExtractor(ABC):
+    """Base class for extracting partition and bucket information from PyArrow 
data."""
+
+    def __init__(self, table_schema: TableSchema):
+        self.table_schema = table_schema
+        self.partition_indices = 
self._get_field_indices(table_schema.partition_keys)
+
+    def extract_partition_bucket_batch(self, data: pa.RecordBatch) -> 
Tuple[List[Tuple], List[int]]:
+        partitions = self._extract_partitions_batch(data)
+        buckets = self._extract_buckets_batch(data)
+        return partitions, buckets
+
+    def _get_field_indices(self, field_names: List[str]) -> List[int]:
+        if not field_names:
+            return []
+        field_map = {field.name: i for i, field in 
enumerate(self.table_schema.fields)}
+        return [field_map[name] for name in field_names if name in field_map]
+
+    def _extract_partitions_batch(self, data: pa.RecordBatch) -> List[Tuple]:
+        if not self.partition_indices:
+            return [() for _ in range(data.num_rows)]
+
+        partition_columns = [data.column(i) for i in self.partition_indices]
+
+        partitions = []
+        for row_idx in range(data.num_rows):
+            partition_values = tuple(col[row_idx].as_py() for col in 
partition_columns)
+            partitions.append(partition_values)
+
+        return partitions
+
+    @abstractmethod
+    def _extract_buckets_batch(self, table: pa.RecordBatch) -> List[int]:
+        """Extract bucket numbers for all rows. Must be implemented by 
subclasses."""
+        pass
+
+
+class FixedBucketRowKeyExtractor(RowKeyExtractor):
+    """Fixed bucket mode extractor with configurable number of buckets."""
+
+    def __init__(self, table_schema: TableSchema):
+        super().__init__(table_schema)
+        self.num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1)
+        if self.num_buckets <= 0:
+            raise ValueError(f"Fixed bucket mode requires bucket > 0, got 
{self.num_buckets}")
+
+        bucket_key_option = table_schema.options.get(CoreOptions.BUCKET_KEY, 
'')
+        if bucket_key_option.strip():
+            self.bucket_keys = [k.strip() for k in 
bucket_key_option.split(',')]
+        else:
+            self.bucket_keys = [pk for pk in table_schema.primary_keys
+                                if pk not in table_schema.partition_keys]
+
+        self.bucket_key_indices = self._get_field_indices(self.bucket_keys)
+
+    def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
+        columns = [data.column(i) for i in self.bucket_key_indices]
+        hashes = []
+        for row_idx in range(data.num_rows):
+            row_values = tuple(col[row_idx].as_py() for col in columns)
+            hashes.append(hash(row_values))
+        return [abs(hash_val) % self.num_buckets for hash_val in hashes]
+
+
+class UnawareBucketRowKeyExtractor(RowKeyExtractor):
+    """Extractor for unaware bucket mode (bucket = -1, no primary keys)."""
+
+    def __init__(self, table_schema: TableSchema):
+        super().__init__(table_schema)
+        num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1)
+
+        if num_buckets != -1:
+            raise ValueError(f"Unaware bucket mode requires bucket = -1, got 
{num_buckets}")
+
+    def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
+        return [0] * data.num_rows
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 78de30a459..a661bd7543 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -23,17 +23,18 @@ from typing import Tuple, Optional, List
 from pathlib import Path
 from abc import ABC, abstractmethod
 
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
-from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.table.row.binary_row import BinaryRow
 
 
 class DataWriter(ABC):
     """Base class for data writers that handle PyArrow tables directly."""
 
-    def __init__(self, table: FileStoreTable, partition: Tuple, bucket: int):
-        self.table = table
+    def __init__(self, table, partition: Tuple, bucket: int):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
         self.partition = partition
         self.bucket = bucket
 
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 9b10236e14..d4b93ee987 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -18,7 +18,7 @@
 
 import pyarrow as pa
 import pyarrow.compute as pc
-from typing import Tuple, Dict
+from typing import Tuple
 
 from pypaimon.write.writer.data_writer import DataWriter
 
@@ -26,12 +26,10 @@ from pypaimon.write.writer.data_writer import DataWriter
 class KeyValueDataWriter(DataWriter):
     """Data writer for primary key tables with system fields and sorting."""
 
-    def __init__(self, partition: Tuple, bucket: int, file_io, table_schema, 
table_identifier,
-                 target_file_size: int, options: Dict[str, str]):
-        super().__init__(partition, bucket, file_io, table_schema, 
table_identifier,
-                         target_file_size, options)
+    def __init__(self, table, partition: Tuple, bucket: int):
+        super().__init__(table, partition, bucket)
         self.sequence_generator = SequenceGenerator()
-        self.trimmed_primary_key = [field.name for field in 
self.table.table_schema.get_trimmed_primary_key_fields()]
+        self.trimmed_primary_key = [field.name for field in 
self.trimmed_primary_key_fields]
 
     def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
         enhanced_data = self._add_system_fields(data)
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 499c6ab8dc..8fa06e11bc 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -27,6 +27,7 @@ install_requires = [
     'cachetools==5.3.3',
     'ossfs==2023.12.0'
     'fastavro==1.11.1'
+    'pyarrow==15.0.2'
 ]
 
 long_description = "See Apache Paimon Python API \

Reply via email to