This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 4ec5e81a6a [python] Support table write for PyPaimon (#6001) 4ec5e81a6a is described below commit 4ec5e81a6aa967b8e496c9d6a7bae8b0c70c73b7 Author: ChengHui Chen <27797326+chenghuic...@users.noreply.github.com> AuthorDate: Fri Aug 1 10:08:11 2025 +0800 [python] Support table write for PyPaimon (#6001) --- .github/workflows/paimon-python-checks.yml | 2 +- paimon-python/pypaimon/api/__init__.py | 2 +- paimon-python/pypaimon/api/auth.py | 2 +- paimon-python/pypaimon/api/token_loader.py | 4 +- paimon-python/pypaimon/catalog/catalog_utils.py | 2 +- .../pypaimon/catalog/filesystem_catalog.py | 5 +- .../pypaimon/catalog/rest/rest_catalog.py | 2 +- paimon-python/pypaimon/{api => common}/config.py | 9 + .../pypaimon/{api => common}/core_options.py | 2 + paimon-python/pypaimon/common/file_io.py | 311 +++++++++++++++++++-- .../pypaimon/manifest/manifest_file_manager.py | 3 +- .../pypaimon/manifest/manifest_list_manager.py | 7 +- paimon-python/pypaimon/pvfs/__init__.py | 2 +- paimon-python/pypaimon/read/read_builder.py | 7 +- paimon-python/pypaimon/read/table_scan.py | 7 +- paimon-python/pypaimon/schema/table_schema.py | 2 +- .../pypaimon/snapshot/snapshot_manager.py | 7 +- paimon-python/pypaimon/table/file_store_table.py | 21 +- paimon-python/pypaimon/table/table.py | 13 +- .../pypaimon/tests/filesystem_catalog_test.py | 83 ++++++ paimon-python/pypaimon/tests/writer_test.py | 78 ++++++ paimon-python/pypaimon/write/batch_table_commit.py | 72 +++++ paimon-python/pypaimon/write/batch_table_write.py | 63 +++++ .../pypaimon/write/batch_write_builder.py | 51 ++++ .../core_options.py => write/commit_message.py} | 55 ++-- paimon-python/pypaimon/write/file_store_commit.py | 120 ++++++++ paimon-python/pypaimon/write/file_store_write.py | 75 +++++ paimon-python/pypaimon/write/row_key_extractor.py | 102 +++++++ paimon-python/pypaimon/write/writer/data_writer.py | 9 +- .../pypaimon/write/writer/key_value_data_writer.py | 10 +- paimon-python/setup.py | 1 + 31 files changed, 1039 insertions(+), 90 deletions(-) diff --git a/.github/workflows/paimon-python-checks.yml b/.github/workflows/paimon-python-checks.yml index 2a3a22e1f6..de6c35dac4 100644 --- a/.github/workflows/paimon-python-checks.yml +++ b/.github/workflows/paimon-python-checks.yml @@ -45,7 +45,7 @@ jobs: python-version: ${{ env.PYTHON_VERSION }} - name: Install dependencies run: | - python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 pyarrow==15.0.2 numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 pytest~=7.0 requests 2>&1 >/dev/null + python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==15.0.2 numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 pytest~=7.0 requests 2>&1 >/dev/null - name: Run lint-python.sh run: | chmod +x paimon-python/dev/lint-python.sh diff --git a/paimon-python/pypaimon/api/__init__.py b/paimon-python/pypaimon/api/__init__.py index 509c4d91a2..4c99ff6ed0 100644 --- a/paimon-python/pypaimon/api/__init__.py +++ b/paimon-python/pypaimon/api/__init__.py @@ -31,7 +31,7 @@ from pypaimon.api.api_response import ( ) from pypaimon.api.api_resquest import CreateDatabaseRequest, AlterDatabaseRequest, RenameTableRequest, \ CreateTableRequest -from pypaimon.api.config import CatalogOptions +from pypaimon.common.config import CatalogOptions from pypaimon.api.client import HttpClient from pypaimon.common.identifier import Identifier from pypaimon.api.typedef import T diff --git a/paimon-python/pypaimon/api/auth.py b/paimon-python/pypaimon/api/auth.py index 393fa15920..eef71e3664 100644 --- a/paimon-python/pypaimon/api/auth.py +++ b/paimon-python/pypaimon/api/auth.py @@ -27,7 +27,7 @@ from typing import Optional, Dict from .token_loader import DLFTokenLoader, DLFToken from .typedef import RESTAuthParameter -from .config import CatalogOptions +from pypaimon.common.config import CatalogOptions class AuthProvider(ABC): diff --git a/paimon-python/pypaimon/api/token_loader.py b/paimon-python/pypaimon/api/token_loader.py index 9214345fce..bfc1ef173e 100644 --- a/paimon-python/pypaimon/api/token_loader.py +++ b/paimon-python/pypaimon/api/token_loader.py @@ -25,7 +25,7 @@ from requests.adapters import HTTPAdapter from requests.exceptions import RequestException from pypaimon.common.rest_json import json_field, JSON -from .config import CatalogOptions +from pypaimon.common.config import CatalogOptions from .client import ExponentialRetry @@ -59,7 +59,7 @@ class DLFToken: @classmethod def from_options(cls, options: Dict[str, str]) -> Optional['DLFToken']: - from .config import CatalogOptions + from pypaimon.common.config import CatalogOptions if (options.get(CatalogOptions.DLF_ACCESS_KEY_ID) is None or options.get(CatalogOptions.DLF_ACCESS_KEY_SECRET) is None): return None diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py b/paimon-python/pypaimon/catalog/catalog_utils.py index 2bae27aa4e..9b030b9f02 100644 --- a/paimon-python/pypaimon/catalog/catalog_utils.py +++ b/paimon-python/pypaimon/catalog/catalog_utils.py @@ -18,7 +18,7 @@ limitations under the License. from pathlib import Path from typing import Callable, Any -from pypaimon.api.core_options import CoreOptions +from pypaimon.common.core_options import CoreOptions from pypaimon.common.identifier import Identifier from pypaimon.catalog.table_metadata import TableMetadata diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index 6651eec317..fa31a93c53 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -22,9 +22,10 @@ from urllib.parse import urlparse from pypaimon import Catalog, Database, Table from pypaimon.api import CatalogOptions, Identifier -from pypaimon.api.core_options import CoreOptions +from pypaimon.common.core_options import CoreOptions from pypaimon.catalog.catalog_exception import TableNotExistException, DatabaseNotExistException, \ TableAlreadyExistException, DatabaseAlreadyExistException +from pypaimon.common.file_io import FileIO from pypaimon.schema.schema_manager import SchemaManager from pypaimon.table.file_store_table import FileStoreTable @@ -35,7 +36,7 @@ class FileSystemCatalog(Catalog): raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE}' path must be set") self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE) self.catalog_options = catalog_options - self.file_io = None # FileIO(self.warehouse, self.catalog_options) + self.file_io = FileIO(self.warehouse, self.catalog_options) def get_database(self, name: str) -> Database: if self.file_io.exists(self.get_database_path(name)): diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index ab08685840..540beb6353 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -21,7 +21,7 @@ from typing import List, Dict, Optional, Union from pypaimon import Database, Catalog, Schema from pypaimon.api import RESTApi, CatalogOptions from pypaimon.api.api_response import PagedList, GetTableResponse -from pypaimon.api.core_options import CoreOptions +from pypaimon.common.core_options import CoreOptions from pypaimon.common.identifier import Identifier from pypaimon.api.options import Options diff --git a/paimon-python/pypaimon/api/config.py b/paimon-python/pypaimon/common/config.py similarity index 87% rename from paimon-python/pypaimon/api/config.py rename to paimon-python/pypaimon/common/config.py index 2a33182993..4bfcc418e7 100644 --- a/paimon-python/pypaimon/api/config.py +++ b/paimon-python/pypaimon/common/config.py @@ -20,6 +20,15 @@ class OssOptions: OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret" OSS_SECURITY_TOKEN = "fs.oss.securityToken" OSS_ENDPOINT = "fs.oss.endpoint" + OSS_REGION = "fs.oss.region" + + +class S3Options: + S3_ACCESS_KEY_ID = "fs.s3.accessKeyId" + S3_ACCESS_KEY_SECRET = "fs.s3.accessKeySecret" + S3_SECURITY_TOKEN = "fs.s3.securityToken" + S3_ENDPOINT = "fs.s3.endpoint" + S3_REGION = "fs.s3.region" class CatalogOptions: diff --git a/paimon-python/pypaimon/api/core_options.py b/paimon-python/pypaimon/common/core_options.py similarity index 96% copy from paimon-python/pypaimon/api/core_options.py copy to paimon-python/pypaimon/common/core_options.py index 18d2fba291..1aa4fc065f 100644 --- a/paimon-python/pypaimon/api/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -44,3 +44,5 @@ class CoreOptions(str, Enum): FILE_BLOCK_SIZE = "file.block-size" # Scan options SCAN_FALLBACK_BRANCH = "scan.fallback-branch" + # commit options + COMMIT_USER_PREFIX = "commit.user-prefix" diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index b58a936af4..c2d579eb53 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -15,43 +15,302 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from abc import ABC, abstractmethod + +import logging +import os +import subprocess from pathlib import Path +from typing import Optional, Dict, Any, List +from urllib.parse import urlparse, splitport +import pyarrow +from pyarrow._fs import FileSystem -class FileIO(ABC): - @abstractmethod - def exists(self, path: Path) -> bool: - """""" +from pypaimon.common.config import OssOptions, S3Options - @abstractmethod - def read_file_utf8(self, path: Path) -> str: - """""" - @abstractmethod - def try_to_write_atomic(self, path: Path, content: str) -> bool: - """""" +class FileIO: + def __init__(self, warehouse: str, catalog_options: dict): + self.properties = catalog_options + self.logger = logging.getLogger(__name__) + scheme, netloc, path = self.parse_location(warehouse) + if scheme in {"oss"}: + self.filesystem = self._initialize_oss_fs() + elif scheme in {"s3", "s3a", "s3n"}: + self.filesystem = self._initialize_s3_fs() + elif scheme in {"hdfs", "viewfs"}: + self.filesystem = self._initialize_hdfs_fs(scheme, netloc) + elif scheme in {"file"}: + self.filesystem = self._initialize_local_fs() + else: + raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + + @staticmethod + def parse_location(location: str): + uri = urlparse(location) + if not uri.scheme: + return "file", uri.netloc, os.path.abspath(location) + elif uri.scheme in ("hdfs", "viewfs"): + return uri.scheme, uri.netloc, uri.path + else: + return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + + def _initialize_oss_fs(self) -> FileSystem: + from pyarrow.fs import S3FileSystem + + client_kwargs = { + "endpoint_override": self.properties.get(OssOptions.OSS_ENDPOINT), + "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID), + "secret_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET), + "session_token": self.properties.get(OssOptions.OSS_SECURITY_TOKEN), + "region": self.properties.get(OssOptions.OSS_REGION), + "force_virtual_addressing": True, + } + + return S3FileSystem(**client_kwargs) + + def _initialize_s3_fs(self) -> FileSystem: + from pyarrow.fs import S3FileSystem + + client_kwargs = { + "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT), + "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID), + "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET), + "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN), + "region": self.properties.get(S3Options.S3_REGION), + "force_virtual_addressing": True, + } + + return S3FileSystem(**client_kwargs) + + def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import HadoopFileSystem + + if 'HADOOP_HOME' not in os.environ: + raise RuntimeError("HADOOP_HOME environment variable is not set.") + if 'HADOOP_CONF_DIR' not in os.environ: + raise RuntimeError("HADOOP_CONF_DIR environment variable is not set.") + + hadoop_home = os.environ.get("HADOOP_HOME") + native_lib_path = f"{hadoop_home}/lib/native" + os.environ['LD_LIBRARY_PATH'] = f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}" + + class_paths = subprocess.run( + [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'], + capture_output=True, + text=True, + check=True + ) + os.environ['CLASSPATH'] = class_paths.stdout.strip() + + host, port_str = splitport(netloc) + return HadoopFileSystem( + host=host, + port=int(port_str), + user=os.environ.get('HADOOP_USER_NAME', 'hadoop') + ) + + def _initialize_local_fs(self) -> FileSystem: + from pyarrow.fs import LocalFileSystem + + return LocalFileSystem() + + def new_input_stream(self, path: Path): + return self.filesystem.open_input_file(str(path)) + + def new_output_stream(self, path: Path): + parent_dir = path.parent + if str(parent_dir) and not self.exists(parent_dir): + self.mkdirs(parent_dir) + + return self.filesystem.open_output_stream(str(path)) + + def get_file_status(self, path: Path): + file_infos = self.filesystem.get_file_info([str(path)]) + return file_infos[0] - @abstractmethod def list_status(self, path: Path): - """""" + selector = pyarrow.fs.FileSelector(str(path), recursive=False, allow_not_found=True) + return self.filesystem.get_file_info(selector) + + def list_directories(self, path: Path): + file_infos = self.list_status(path) + return [info for info in file_infos if info.type == pyarrow.fs.FileType.Directory] + + def exists(self, path: Path) -> bool: + try: + file_info = self.filesystem.get_file_info([str(path)])[0] + return file_info.type != pyarrow.fs.FileType.NotFound + except Exception: + return False + + def delete(self, path: Path, recursive: bool = False) -> bool: + try: + file_info = self.filesystem.get_file_info([str(path)])[0] + if file_info.type == pyarrow.fs.FileType.Directory: + if recursive: + self.filesystem.delete_dir_contents(str(path)) + else: + self.filesystem.delete_dir(str(path)) + else: + self.filesystem.delete_file(str(path)) + return True + except Exception as e: + self.logger.warning(f"Failed to delete {path}: {e}") + return False - @abstractmethod def mkdirs(self, path: Path) -> bool: - """""" + try: + self.filesystem.create_dir(str(path), recursive=True) + return True + except Exception as e: + self.logger.warning(f"Failed to create directory {path}: {e}") + return False - @abstractmethod - def write_file(self, path: Path, content: str, overwrite: bool = False): - """""" + def rename(self, src: Path, dst: Path) -> bool: + try: + dst_parent = dst.parent + if str(dst_parent) and not self.exists(dst_parent): + self.mkdirs(dst_parent) + + self.filesystem.move(str(src), str(dst)) + return True + except Exception as e: + self.logger.warning(f"Failed to rename {src} to {dst}: {e}") + return False - @abstractmethod def delete_quietly(self, path: Path): - """""" + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Ready to delete {path}") - @abstractmethod - def new_input_stream(self, path: Path): - """""" + try: + if not self.delete(path, False) and self.exists(path): + self.logger.warning(f"Failed to delete file {path}") + except Exception: + self.logger.warning(f"Exception occurs when deleting file {path}", exc_info=True) + + def delete_files_quietly(self, files: List[Path]): + for file_path in files: + self.delete_quietly(file_path) + + def delete_directory_quietly(self, directory: Path): + if self.logger.isEnabledFor(logging.DEBUG): + self.logger.debug(f"Ready to delete {directory}") + + try: + if not self.delete(directory, True) and self.exists(directory): + self.logger.warning(f"Failed to delete directory {directory}") + except Exception: + self.logger.warning(f"Exception occurs when deleting directory {directory}", exc_info=True) + + def get_file_size(self, path: Path) -> int: + file_info = self.get_file_status(path) + if file_info.size is None: + raise ValueError(f"File size not available for {path}") + return file_info.size + + def is_dir(self, path: Path) -> bool: + file_info = self.get_file_status(path) + return file_info.type == pyarrow.fs.FileType.Directory + + def check_or_mkdirs(self, path: Path): + if self.exists(path): + if not self.is_dir(path): + raise ValueError(f"The path '{path}' should be a directory.") + else: + self.mkdirs(path) + + def read_file_utf8(self, path: Path) -> str: + with self.new_input_stream(path) as input_stream: + return input_stream.read().decode('utf-8') + + def try_to_write_atomic(self, path: Path, content: str) -> bool: + temp_path = path.with_suffix(path.suffix + ".tmp") if path.suffix else Path(str(path) + ".tmp") + success = False + try: + self.write_file(temp_path, content, False) + success = self.rename(temp_path, path) + finally: + if not success: + self.delete_quietly(temp_path) + return success + + def write_file(self, path: Path, content: str, overwrite: bool = False): + with self.new_output_stream(path) as output_stream: + output_stream.write(content.encode('utf-8')) + + def overwrite_file_utf8(self, path: Path, content: str): + with self.new_output_stream(path) as output_stream: + output_stream.write(content.encode('utf-8')) + + def copy_file(self, source_path: Path, target_path: Path, overwrite: bool = False): + if not overwrite and self.exists(target_path): + raise FileExistsError(f"Target file {target_path} already exists and overwrite=False") + + self.filesystem.copy_file(str(source_path), str(target_path)) + + def copy_files(self, source_directory: Path, target_directory: Path, overwrite: bool = False): + file_infos = self.list_status(source_directory) + for file_info in file_infos: + if file_info.type == pyarrow.fs.FileType.File: + source_file = Path(file_info.path) + target_file = target_directory / source_file.name + self.copy_file(source_file, target_file, overwrite) + + def read_overwritten_file_utf8(self, path: Path) -> Optional[str]: + retry_number = 0 + exception = None + while retry_number < 5: + try: + return self.read_file_utf8(path) + except FileNotFoundError: + return None + except Exception as e: + if not self.exists(path): + return None + + if (str(type(e).__name__).endswith("RemoteFileChangedException") or + (str(e) and "Blocklist for" in str(e) and "has changed" in str(e))): + exception = e + retry_number += 1 + else: + raise e + + if exception: + if isinstance(exception, Exception): + raise exception + else: + raise RuntimeError(exception) + + return None + + def write_parquet(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'snappy', **kwargs): + try: + import pyarrow.parquet as pq + + with self.new_output_stream(path) as output_stream: + with pq.ParquetWriter(output_stream, data.schema, compression=compression, **kwargs) as pw: + pw.write_batch(data) + + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e + + def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'zstd', **kwargs): + try: + import pyarrow.orc as orc + + with self.new_output_stream(path) as output_stream: + orc.write_table( + data, + output_stream, + compression=compression, + **kwargs + ) + + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e - @abstractmethod - def get_file_size(self, path: Path): - """""" + def write_avro(self, path: Path, table: pyarrow.RecordBatch, schema: Optional[Dict[str, Any]] = None, **kwargs): + raise ValueError("Unsupported write_avro yet") diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index a70d0e2d46..6775d24bcd 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -23,7 +23,6 @@ from io import BytesIO from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry, MANIFEST_ENTRY_SCHEMA -from pypaimon.table.file_store_table import FileStoreTable from pypaimon.table.row.binary_row import BinaryRowDeserializer, BinaryRowSerializer, BinaryRow @@ -31,6 +30,8 @@ class ManifestFileManager: """Writer for manifest files in Avro format using unified FileIO.""" def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + self.table: FileStoreTable = table self.manifest_path = table.table_path / "manifest" self.file_io = table.file_io diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index 969c6a05ef..1c58ea5b6a 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -24,14 +24,15 @@ from io import BytesIO from pypaimon.manifest.schema.manifest_file_meta import MANIFEST_FILE_META_SCHEMA from pypaimon.snapshot.snapshot import Snapshot -from pypaimon.table.file_store_table import FileStoreTable class ManifestListManager: """Manager for manifest list files in Avro format using unified FileIO.""" - def __init__(self, table: FileStoreTable): - self.table = table + def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table self.manifest_path = self.table.table_path / "manifest" self.file_io = self.table.file_io diff --git a/paimon-python/pypaimon/pvfs/__init__.py b/paimon-python/pypaimon/pvfs/__init__.py index 2b425d0251..ec1cb0f05c 100644 --- a/paimon-python/pypaimon/pvfs/__init__.py +++ b/paimon-python/pypaimon/pvfs/__init__.py @@ -33,7 +33,7 @@ from fsspec.implementations.local import LocalFileSystem from pypaimon.api import RESTApi, GetTableTokenResponse, Schema, Identifier, GetTableResponse from pypaimon.api.client import NoSuchResourceException, AlreadyExistsException -from pypaimon.api.config import CatalogOptions, OssOptions, PVFSOptions +from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions PROTOCOL_NAME = "pvfs" diff --git a/paimon-python/pypaimon/read/read_builder.py b/paimon-python/pypaimon/read/read_builder.py index 604eff9ff1..30f824a698 100644 --- a/paimon-python/pypaimon/read/read_builder.py +++ b/paimon-python/pypaimon/read/read_builder.py @@ -23,14 +23,15 @@ from pypaimon.common.predicate_builder import PredicateBuilder from pypaimon.read.table_read import TableRead from pypaimon.read.table_scan import TableScan from pypaimon.schema.data_types import DataField -from pypaimon.table.file_store_table import FileStoreTable class ReadBuilder: """Implementation of ReadBuilder for native Python reading.""" - def __init__(self, table: FileStoreTable): - self.table = table + def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table self._predicate: Optional[Predicate] = None self._projection: Optional[List[str]] = None self._limit: Optional[int] = None diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index fb56d70a82..27015d3eda 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -29,15 +29,16 @@ from pypaimon.read.plan import Plan from pypaimon.read.split import Split from pypaimon.schema.data_types import DataField from pypaimon.snapshot.snapshot_manager import SnapshotManager -from pypaimon.table.file_store_table import FileStoreTable class TableScan: """Implementation of TableScan for native Python reading.""" - def __init__(self, table: FileStoreTable, predicate: Optional[Predicate], limit: Optional[int], + def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], read_type: List[DataField]): - self.table = table + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table self.predicate = predicate self.predicate = predicate self.limit = limit diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py index 635cb3464d..736de902db 100644 --- a/paimon-python/pypaimon/schema/table_schema.py +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -27,7 +27,7 @@ import pyarrow from pypaimon import Schema from pypaimon.common.rest_json import json_field from pypaimon.schema import data_types -from pypaimon.api.core_options import CoreOptions +from pypaimon.common.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.schema.data_types import DataField diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py b/paimon-python/pypaimon/snapshot/snapshot_manager.py index 9e500c48d2..6a8a68e73a 100644 --- a/paimon-python/pypaimon/snapshot/snapshot_manager.py +++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py @@ -21,14 +21,15 @@ from typing import Optional from pypaimon.common.file_io import FileIO from pypaimon.snapshot.snapshot import Snapshot -from pypaimon.table.file_store_table import FileStoreTable class SnapshotManager: """Manager for snapshot files using unified FileIO.""" - def __init__(self, table: FileStoreTable): - self.table = table + def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table self.file_io: FileIO = self.table.file_io self.snapshot_dir = self.table.table_path / "snapshot" diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index e1b8322fe8..225edd1cc4 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -19,12 +19,14 @@ from pathlib import Path from pypaimon import Table -from pypaimon.api.core_options import CoreOptions +from pypaimon.common.core_options import CoreOptions from pypaimon.common.identifier import Identifier from pypaimon.schema.table_schema import TableSchema from pypaimon.common.file_io import FileIO from pypaimon.schema.schema_manager import SchemaManager from pypaimon.table.bucket_mode import BucketMode +from pypaimon.write.batch_write_builder import BatchWriteBuilder +from pypaimon.write.row_key_extractor import RowKeyExtractor, FixedBucketRowKeyExtractor, UnawareBucketRowKeyExtractor class FileStoreTable(Table): @@ -56,3 +58,20 @@ class FileStoreTable(Table): return BucketMode.BUCKET_UNAWARE else: return BucketMode.HASH_FIXED + + def new_read_builder(self) -> 'ReadBuilder': + pass + + def new_batch_write_builder(self) -> BatchWriteBuilder: + return BatchWriteBuilder(self) + + def create_row_key_extractor(self) -> RowKeyExtractor: + bucket_mode = self.bucket_mode() + if bucket_mode == BucketMode.HASH_FIXED: + return FixedBucketRowKeyExtractor(self.table_schema) + elif bucket_mode == BucketMode.BUCKET_UNAWARE: + return UnawareBucketRowKeyExtractor(self.table_schema) + elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode == BucketMode.CROSS_PARTITION: + raise ValueError(f"Unsupported bucket mode {bucket_mode} yet") + else: + raise ValueError(f"Unsupported bucket mode: {bucket_mode}") diff --git a/paimon-python/pypaimon/table/table.py b/paimon-python/pypaimon/table/table.py index 6f15b049ae..3a1fe2e622 100644 --- a/paimon-python/pypaimon/table/table.py +++ b/paimon-python/pypaimon/table/table.py @@ -16,8 +16,19 @@ # limitations under the License. ################################################################################# -from abc import ABC +from abc import ABC, abstractmethod + +from pypaimon.read.read_builder import ReadBuilder +from pypaimon.write.batch_write_builder import BatchWriteBuilder class Table(ABC): """A table provides basic abstraction for table read and write.""" + + @abstractmethod + def new_read_builder(self) -> ReadBuilder: + """Return a builder for building table scan and table read.""" + + @abstractmethod + def new_batch_write_builder(self) -> BatchWriteBuilder: + """Returns a builder for building batch table write and table commit.""" diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py new file mode 100644 index 0000000000..5b0dc81b2f --- /dev/null +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import shutil +import tempfile +import unittest + +from pypaimon import Schema +from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException, TableAlreadyExistException, \ + DatabaseNotExistException, TableNotExistException +from pypaimon.catalog.catalog_factory import CatalogFactory +from pypaimon.schema.data_types import DataField +from pypaimon.table.file_store_table import FileStoreTable + + +class FileSystemCatalogTestCase(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="unittest_") + self.warehouse = os.path.join(self.temp_dir, 'test_dir') + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_database(self): + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + self.assertTrue(os.path.exists(self.warehouse + "/test_db.db")) + + with self.assertRaises(DatabaseAlreadyExistException): + catalog.create_database("test_db", False) + + catalog.create_database("test_db", True) + + with self.assertRaises(DatabaseNotExistException): + catalog.get_database("test_db_x") + + database = catalog.get_database("test_db") + self.assertEqual(database.name, "test_db") + + def test_table(self): + fields = [ + DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}), + DataField.from_dict({"id": 2, "name": "f1", "type": "INT"}), + DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}), + ] + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + catalog.create_table("test_db.test_table", Schema(fields=fields), False) + self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/schema/schema-0")) + + with self.assertRaises(TableAlreadyExistException): + catalog.create_table("test_db.test_table", Schema(fields=fields), False) + + catalog.create_table("test_db.test_table", Schema(fields=fields), True) + + database = catalog.get_database("test_db") + self.assertEqual(database.name, "test_db") + + with self.assertRaises(TableNotExistException): + catalog.get_table("test_db.test_table_x") + + table = catalog.get_table("test_db.test_table") + self.assertTrue(table is not None) + self.assertTrue(isinstance(table, FileStoreTable)) diff --git a/paimon-python/pypaimon/tests/writer_test.py b/paimon-python/pypaimon/tests/writer_test.py new file mode 100644 index 0000000000..9a8230b0eb --- /dev/null +++ b/paimon-python/pypaimon/tests/writer_test.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import glob +import os +import shutil +import tempfile +import unittest + +import pyarrow + +from pypaimon import Schema +from pypaimon.catalog.catalog_factory import CatalogFactory +from pypaimon.schema.data_types import DataField + + +class WriterTestCase(unittest.TestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp(prefix="unittest_") + self.warehouse = os.path.join(self.temp_dir, 'test_dir') + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_writer(self): + pa_schema = pyarrow.schema([ + ('f0', pyarrow.int32()), + ('f1', pyarrow.string()), + ('f2', pyarrow.string()) + ]) + fields = [ + DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}), + DataField.from_dict({"id": 2, "name": "f1", "type": "STRING"}), + DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}), + ] + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + catalog.create_table("test_db.test_table", Schema(fields=fields), False) + table = catalog.get_table("test_db.test_table") + + data = { + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + 'f2': ['X', 'Y', 'Z'] + } + expect = pyarrow.Table.from_pydict(data, schema=pa_schema) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(expect) + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + table_write.close() + table_commit.close() + + self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/snapshot/LATEST")) + self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/snapshot/snapshot-1")) + self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/manifest")) + self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/bucket-0")) + self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/manifest/*.avro")), 2) + self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/bucket-0/*.parquet")), 1) diff --git a/paimon-python/pypaimon/write/batch_table_commit.py b/paimon-python/pypaimon/write/batch_table_commit.py new file mode 100644 index 0000000000..14849e6309 --- /dev/null +++ b/paimon-python/pypaimon/write/batch_table_commit.py @@ -0,0 +1,72 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import time +from typing import List, Optional + +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.file_store_commit import FileStoreCommit + + +class BatchTableCommit: + """Python implementation of BatchTableCommit for batch writing scenarios.""" + + def __init__(self, table, commit_user: str, static_partition: Optional[dict]): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self.commit_user = commit_user + self.overwrite_partition = static_partition + self.file_store_commit = FileStoreCommit(table, commit_user) + self.batch_committed = False + + def commit(self, commit_messages: List[CommitMessage]): + self._check_committed() + + non_empty_messages = [msg for msg in commit_messages if not msg.is_empty()] + if not non_empty_messages: + return + + commit_identifier = int(time.time() * 1000) + + try: + if self.overwrite_partition is not None: + self.file_store_commit.overwrite( + partition=self.overwrite_partition, + commit_messages=non_empty_messages, + commit_identifier=commit_identifier + ) + else: + self.file_store_commit.commit( + commit_messages=non_empty_messages, + commit_identifier=commit_identifier + ) + except Exception as e: + self.file_store_commit.abort(commit_messages) + raise RuntimeError(f"Failed to commit: {str(e)}") from e + + def abort(self, commit_messages: List[CommitMessage]): + self.file_store_commit.abort(commit_messages) + + def close(self): + self.file_store_commit.close() + + def _check_committed(self): + if self.batch_committed: + raise RuntimeError("BatchTableCommit only supports one-time committing.") + self.batch_committed = True diff --git a/paimon-python/pypaimon/write/batch_table_write.py b/paimon-python/pypaimon/write/batch_table_write.py new file mode 100644 index 0000000000..921de1151f --- /dev/null +++ b/paimon-python/pypaimon/write/batch_table_write.py @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import pyarrow as pa +from collections import defaultdict + +from typing import List + +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.file_store_write import FileStoreWrite + + +class BatchTableWrite: + def __init__(self, table): + self.file_store_write = FileStoreWrite(table) + self.row_key_extractor = table.create_row_key_extractor() + self.batch_committed = False + + def write_arrow(self, table: pa.Table, row_kind: List[int] = None): + # TODO: support row_kind + batches_iterator = table.to_batches() + for batch in batches_iterator: + self.write_arrow_batch(batch) + + def write_arrow_batch(self, data: pa.RecordBatch, row_kind: List[int] = None): + # TODO: support row_kind + partitions, buckets = self.row_key_extractor.extract_partition_bucket_batch(data) + + partition_bucket_groups = defaultdict(list) + for i in range(data.num_rows): + partition_bucket_groups[(tuple(partitions[i]), buckets[i])].append(i) + + for (partition, bucket), row_indices in partition_bucket_groups.items(): + indices_array = pa.array(row_indices, type=pa.int64()) + sub_table = pa.compute.take(data, indices_array) + self.file_store_write.write(partition, bucket, sub_table) + + def write_pandas(self, dataframe): + raise ValueError("Not implemented yet") + + def prepare_commit(self) -> List[CommitMessage]: + if self.batch_committed: + raise RuntimeError("BatchTableWrite only supports one-time committing.") + self.batch_committed = True + return self.file_store_write.prepare_commit() + + def close(self): + self.file_store_write.close() diff --git a/paimon-python/pypaimon/write/batch_write_builder.py b/paimon-python/pypaimon/write/batch_write_builder.py new file mode 100644 index 0000000000..e23455f3fc --- /dev/null +++ b/paimon-python/pypaimon/write/batch_write_builder.py @@ -0,0 +1,51 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import uuid + +from typing import Optional + +from pypaimon.common.core_options import CoreOptions +from pypaimon.write.batch_table_commit import BatchTableCommit +from pypaimon.write.batch_table_write import BatchTableWrite + + +class BatchWriteBuilder: + def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self.commit_user = self._create_commit_user() + self.static_partition = None + + def overwrite(self, static_partition: Optional[dict] = None): + self.static_partition = static_partition + return self + + def new_write(self) -> BatchTableWrite: + return BatchTableWrite(self.table) + + def new_commit(self) -> BatchTableCommit: + commit = BatchTableCommit(self.table, self.commit_user, self.static_partition) + return commit + + def _create_commit_user(self): + if CoreOptions.COMMIT_USER_PREFIX in self.table.options: + return f"{self.table.options.get(CoreOptions.COMMIT_USER_PREFIX)}_{uuid.uuid4()}" + else: + return str(uuid.uuid4()) diff --git a/paimon-python/pypaimon/api/core_options.py b/paimon-python/pypaimon/write/commit_message.py similarity index 55% rename from paimon-python/pypaimon/api/core_options.py rename to paimon-python/pypaimon/write/commit_message.py index 18d2fba291..fc36f13e5b 100644 --- a/paimon-python/pypaimon/api/core_options.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -16,31 +16,30 @@ # limitations under the License. ################################################################################ -from enum import Enum - - -class CoreOptions(str, Enum): - """Core options for paimon.""" - - def __str__(self): - return self.value - - # Basic options - AUTO_CREATE = "auto-create" - PATH = "path" - TYPE = "type" - BRANCH = "branch" - BUCKET = "bucket" - BUCKET_KEY = "bucket-key" - WAREHOUSE = "warehouse" - # File format options - FILE_FORMAT = "file.format" - FILE_FORMAT_ORC = "orc" - FILE_FORMAT_AVRO = "avro" - FILE_FORMAT_PARQUET = "parquet" - FILE_COMPRESSION = "file.compression" - FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level" - FILE_FORMAT_PER_LEVEL = "file.format.per.level" - FILE_BLOCK_SIZE = "file.block-size" - # Scan options - SCAN_FALLBACK_BRANCH = "scan.fallback-branch" +from typing import Tuple, List + +from pypaimon.manifest.schema.data_file_meta import DataFileMeta + + +class CommitMessage: + """Python implementation of CommitMessage""" + + def __init__(self, partition: Tuple, bucket: int, new_files: List[DataFileMeta]): + self._partition = partition + self._bucket = bucket + self._new_files = new_files or [] + + def partition(self) -> Tuple: + """Get the partition of this commit message.""" + return self._partition + + def bucket(self) -> int: + """Get the bucket of this commit message.""" + return self._bucket + + def new_files(self) -> List[DataFileMeta]: + """Get the list of new files.""" + return self._new_files + + def is_empty(self): + return not self._new_files diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py new file mode 100644 index 0000000000..4928167907 --- /dev/null +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -0,0 +1,120 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import time +from pathlib import Path +from typing import List + +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.snapshot.snapshot import Snapshot +from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.write.commit_message import CommitMessage + + +class FileStoreCommit: + """Core commit logic for file store operations.""" + + def __init__(self, table, commit_user: str): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self.commit_user = commit_user + + self.snapshot_manager = SnapshotManager(table) + self.manifest_file_manager = ManifestFileManager(table) + self.manifest_list_manager = ManifestListManager(table) + + self.manifest_target_size = 8 * 1024 * 1024 + self.manifest_merge_min_count = 30 + + def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): + """Commit the given commit messages in normal append mode.""" + if not commit_messages: + return + + new_manifest_files = self.manifest_file_manager.write(commit_messages) + if not new_manifest_files: + return + latest_snapshot = self.snapshot_manager.get_latest_snapshot() + existing_manifest_files = [] + if latest_snapshot: + existing_manifest_files = self.manifest_list_manager.read_all_manifest_files(latest_snapshot) + new_manifest_files.extend(existing_manifest_files) + manifest_list = self.manifest_list_manager.write(new_manifest_files) + + new_snapshot_id = self._generate_snapshot_id() + snapshot_data = Snapshot( + version=3, + id=new_snapshot_id, + schema_id=0, + base_manifest_list=manifest_list, + delta_manifest_list=manifest_list, + commit_user=self.commit_user, + commit_identifier=commit_identifier, + commit_kind="APPEND", + time_millis=int(time.time() * 1000), + log_offsets={}, + ) + self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data) + + def overwrite(self, partition, commit_messages: List[CommitMessage], commit_identifier: int): + if not commit_messages: + return + + new_manifest_files = self.manifest_file_manager.write(commit_messages) + if not new_manifest_files: + return + + # In overwrite mode, we don't merge with existing manifests + manifest_list = self.manifest_list_manager.write(new_manifest_files) + + new_snapshot_id = self._generate_snapshot_id() + snapshot_data = Snapshot( + version=3, + id=new_snapshot_id, + schema_id=0, + base_manifest_list=manifest_list, + delta_manifest_list=manifest_list, + commit_user=self.commit_user, + commit_identifier=commit_identifier, + commit_kind="OVERWRITE", + time_millis=int(time.time() * 1000), + log_offsets={}, + ) + self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data) + + def abort(self, commit_messages: List[CommitMessage]): + for message in commit_messages: + for file in message.new_files(): + try: + file_path_obj = Path(file.file_path) + if file_path_obj.exists(): + file_path_obj.unlink() + except Exception as e: + print(f"Warning: Failed to clean up file {file.file_path}: {e}") + + def close(self): + pass + + def _generate_snapshot_id(self) -> int: + latest_snapshot = self.snapshot_manager.get_latest_snapshot() + if latest_snapshot: + return latest_snapshot.id + 1 + else: + return 1 diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py new file mode 100644 index 0000000000..d1002bc094 --- /dev/null +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -0,0 +1,75 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import pyarrow as pa +from typing import Dict, Tuple, List + +from pypaimon.write.commit_message import CommitMessage +from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter +from pypaimon.write.writer.data_writer import DataWriter +from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter + + +class FileStoreWrite: + """Base class for file store write operations.""" + + def __init__(self, table): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table + self.data_writers: Dict[Tuple, DataWriter] = {} + + def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): + key = (partition, bucket) + if key not in self.data_writers: + self.data_writers[key] = self._create_data_writer(partition, bucket) + writer = self.data_writers[key] + writer.write(data) + + def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter: + if self.table.is_primary_key_table: + return KeyValueDataWriter( + table=self.table, + partition=partition, + bucket=bucket, + ) + else: + return AppendOnlyDataWriter( + table=self.table, + partition=partition, + bucket=bucket, + ) + + def prepare_commit(self) -> List[CommitMessage]: + commit_messages = [] + for (partition, bucket), writer in self.data_writers.items(): + committed_files = writer.prepare_commit() + if committed_files: + commit_message = CommitMessage( + partition=partition, + bucket=bucket, + new_files=committed_files + ) + commit_messages.append(commit_message) + return commit_messages + + def close(self): + """Close all data writers and clean up resources.""" + for writer in self.data_writers.values(): + writer.close() + self.data_writers.clear() diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py new file mode 100644 index 0000000000..cda3ad07ba --- /dev/null +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -0,0 +1,102 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import pyarrow as pa +from typing import Tuple, List +from abc import ABC, abstractmethod + +from pypaimon.common.core_options import CoreOptions +from pypaimon.schema.table_schema import TableSchema + + +class RowKeyExtractor(ABC): + """Base class for extracting partition and bucket information from PyArrow data.""" + + def __init__(self, table_schema: TableSchema): + self.table_schema = table_schema + self.partition_indices = self._get_field_indices(table_schema.partition_keys) + + def extract_partition_bucket_batch(self, data: pa.RecordBatch) -> Tuple[List[Tuple], List[int]]: + partitions = self._extract_partitions_batch(data) + buckets = self._extract_buckets_batch(data) + return partitions, buckets + + def _get_field_indices(self, field_names: List[str]) -> List[int]: + if not field_names: + return [] + field_map = {field.name: i for i, field in enumerate(self.table_schema.fields)} + return [field_map[name] for name in field_names if name in field_map] + + def _extract_partitions_batch(self, data: pa.RecordBatch) -> List[Tuple]: + if not self.partition_indices: + return [() for _ in range(data.num_rows)] + + partition_columns = [data.column(i) for i in self.partition_indices] + + partitions = [] + for row_idx in range(data.num_rows): + partition_values = tuple(col[row_idx].as_py() for col in partition_columns) + partitions.append(partition_values) + + return partitions + + @abstractmethod + def _extract_buckets_batch(self, table: pa.RecordBatch) -> List[int]: + """Extract bucket numbers for all rows. Must be implemented by subclasses.""" + pass + + +class FixedBucketRowKeyExtractor(RowKeyExtractor): + """Fixed bucket mode extractor with configurable number of buckets.""" + + def __init__(self, table_schema: TableSchema): + super().__init__(table_schema) + self.num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1) + if self.num_buckets <= 0: + raise ValueError(f"Fixed bucket mode requires bucket > 0, got {self.num_buckets}") + + bucket_key_option = table_schema.options.get(CoreOptions.BUCKET_KEY, '') + if bucket_key_option.strip(): + self.bucket_keys = [k.strip() for k in bucket_key_option.split(',')] + else: + self.bucket_keys = [pk for pk in table_schema.primary_keys + if pk not in table_schema.partition_keys] + + self.bucket_key_indices = self._get_field_indices(self.bucket_keys) + + def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: + columns = [data.column(i) for i in self.bucket_key_indices] + hashes = [] + for row_idx in range(data.num_rows): + row_values = tuple(col[row_idx].as_py() for col in columns) + hashes.append(hash(row_values)) + return [abs(hash_val) % self.num_buckets for hash_val in hashes] + + +class UnawareBucketRowKeyExtractor(RowKeyExtractor): + """Extractor for unaware bucket mode (bucket = -1, no primary keys).""" + + def __init__(self, table_schema: TableSchema): + super().__init__(table_schema) + num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1) + + if num_buckets != -1: + raise ValueError(f"Unaware bucket mode requires bucket = -1, got {num_buckets}") + + def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: + return [0] * data.num_rows diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 78de30a459..a661bd7543 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -23,17 +23,18 @@ from typing import Tuple, Optional, List from pathlib import Path from abc import ABC, abstractmethod -from pypaimon.api.core_options import CoreOptions +from pypaimon.common.core_options import CoreOptions from pypaimon.manifest.schema.data_file_meta import DataFileMeta -from pypaimon.table.file_store_table import FileStoreTable from pypaimon.table.row.binary_row import BinaryRow class DataWriter(ABC): """Base class for data writers that handle PyArrow tables directly.""" - def __init__(self, table: FileStoreTable, partition: Tuple, bucket: int): - self.table = table + def __init__(self, table, partition: Tuple, bucket: int): + from pypaimon.table.file_store_table import FileStoreTable + + self.table: FileStoreTable = table self.partition = partition self.bucket = bucket diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index 9b10236e14..d4b93ee987 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -18,7 +18,7 @@ import pyarrow as pa import pyarrow.compute as pc -from typing import Tuple, Dict +from typing import Tuple from pypaimon.write.writer.data_writer import DataWriter @@ -26,12 +26,10 @@ from pypaimon.write.writer.data_writer import DataWriter class KeyValueDataWriter(DataWriter): """Data writer for primary key tables with system fields and sorting.""" - def __init__(self, partition: Tuple, bucket: int, file_io, table_schema, table_identifier, - target_file_size: int, options: Dict[str, str]): - super().__init__(partition, bucket, file_io, table_schema, table_identifier, - target_file_size, options) + def __init__(self, table, partition: Tuple, bucket: int): + super().__init__(table, partition, bucket) self.sequence_generator = SequenceGenerator() - self.trimmed_primary_key = [field.name for field in self.table.table_schema.get_trimmed_primary_key_fields()] + self.trimmed_primary_key = [field.name for field in self.trimmed_primary_key_fields] def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: enhanced_data = self._add_system_fields(data) diff --git a/paimon-python/setup.py b/paimon-python/setup.py index 499c6ab8dc..8fa06e11bc 100644 --- a/paimon-python/setup.py +++ b/paimon-python/setup.py @@ -27,6 +27,7 @@ install_requires = [ 'cachetools==5.3.3', 'ossfs==2023.12.0' 'fastavro==1.11.1' + 'pyarrow==15.0.2' ] long_description = "See Apache Paimon Python API \