This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f08c561409 [python] light refactor datasource modules to decouple
optional dependencies (#7086)
f08c561409 is described below
commit f08c561409a4349291ab163f99a7a09b8d1adfc1
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Jan 21 13:34:21 2026 +0800
[python] light refactor datasource modules to decouple optional
dependencies (#7086)
---
paimon-python/dev/requirements.txt | 2 -
paimon-python/pypaimon/read/datasource/__init__.py | 25 ++++
.../ray_datasource.py} | 128 +------------------
.../pypaimon/read/datasource/torch_dataset.py | 142 +++++++++++++++++++++
paimon-python/pypaimon/read/table_read.py | 6 +-
paimon-python/setup.py | 13 +-
6 files changed, 183 insertions(+), 133 deletions(-)
diff --git a/paimon-python/dev/requirements.txt
b/paimon-python/dev/requirements.txt
index 993cb46a25..aa922eca16 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -35,9 +35,7 @@ pyarrow>=16,<20; python_version >= "3.8"
pylance>=0.20,<1; python_version>="3.9"
pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"
pyroaring
-ray>=2.10,<3; python_version>="3.7"
readerwriterlock>=1,<2
-torch
zstandard>=0.19,<1
cramjam>=1.3.0,<3; python_version>="3.7"
faiss-cpu==1.7.2; python_version >= "3.6" and python_version < "3.7"
diff --git a/paimon-python/pypaimon/read/datasource/__init__.py
b/paimon-python/pypaimon/read/datasource/__init__.py
new file mode 100644
index 0000000000..b3a59c22f9
--- /dev/null
+++ b/paimon-python/pypaimon/read/datasource/__init__.py
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Datasource implementations for reading Paimon tables into various data
processing frameworks.
+
+This package provides optional integrations with:
+- Ray: for distributed data processing (ray_datasource.py)
+- PyTorch: for machine learning training pipelines (torch_dataset.py)
+
+"""
diff --git a/paimon-python/pypaimon/read/datasource.py
b/paimon-python/pypaimon/read/datasource/ray_datasource.py
similarity index 67%
rename from paimon-python/pypaimon/read/datasource.py
rename to paimon-python/pypaimon/read/datasource/ray_datasource.py
index 835effbf0b..7f78fbad6f 100644
--- a/paimon-python/pypaimon/read/datasource.py
+++ b/paimon-python/pypaimon/read/datasource/ray_datasource.py
@@ -27,7 +27,7 @@ from typing import List, Optional, Iterable
import pyarrow
from packaging.version import parse
import ray
-import torch
+from ray.data.datasource import Datasource
from pypaimon.read.split import Split
from pypaimon.read.table_read import TableRead
@@ -39,10 +39,6 @@ logger = logging.getLogger(__name__)
RAY_VERSION_SCHEMA_IN_READ_TASK = "2.48.0" # Schema moved from BlockMetadata
to ReadTask
RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0" # per_task_row_limit parameter
introduced
-from ray.data.datasource import Datasource
-
-from torch.utils.data import Dataset, IterableDataset
-
class RayDatasource(Datasource):
"""
@@ -73,7 +69,6 @@ class RayDatasource(Datasource):
if not self.splits:
return 0
- # Sum up file sizes from all splits
total_size = sum(split.file_size for split in self.splits)
return total_size if total_size > 0 else None
@@ -110,15 +105,12 @@ class RayDatasource(Datasource):
per_task_row_limit = kwargs.get('per_task_row_limit', None)
- # Validate parallelism parameter
if parallelism < 1:
raise ValueError(f"parallelism must be at least 1, got
{parallelism}")
- # Get schema for metadata
if self._schema is None:
self._schema =
PyarrowFieldParser.from_paimon_schema(self.table_read.read_type)
- # Adjust parallelism if it exceeds the number of splits
if parallelism > len(self.splits):
parallelism = len(self.splits)
logger.warning(
@@ -145,14 +137,11 @@ class RayDatasource(Datasource):
from pypaimon.read.table_read import TableRead
worker_table_read = TableRead(table, predicate, read_type)
- # Read all splits in this chunk
arrow_table = worker_table_read.to_arrow(splits)
- # Return as a list to allow Ray to split into multiple blocks if
needed
if arrow_table is not None and arrow_table.num_rows > 0:
return [arrow_table]
else:
- # Return empty table with correct schema
empty_table = pyarrow.Table.from_arrays(
[pyarrow.array([], type=field.type) for field in schema],
schema=schema
@@ -229,118 +218,3 @@ class RayDatasource(Datasource):
read_tasks.append(ReadTask(**read_task_kwargs))
return read_tasks
-
-
-class TorchDataset(Dataset):
- """
- PyTorch Dataset implementation for reading Paimon table data.
-
- This class enables Paimon table data to be used directly with PyTorch's
- training pipeline, allowing for efficient data loading and batching.
- """
-
- def __init__(self, table_read: TableRead, splits: List[Split]):
- """
- Initialize TorchDataset.
-
- Args:
- table_read: TableRead instance for reading data
- splits: List of splits to read
- """
- arrow_table = table_read.to_arrow(splits)
- if arrow_table is None or arrow_table.num_rows == 0:
- self._data = []
- else:
- self._data = arrow_table.to_pylist()
-
- def __len__(self) -> int:
- """
- Return the total number of rows in the dataset.
-
- Returns:
- Total number of rows across all splits
- """
- return len(self._data)
-
- def __getitem__(self, index: int):
- """
- Get a single item from the dataset.
-
- Args:
- index: Index of the item to retrieve
-
- Returns:
- Dictionary containing the row data
- """
- if not self._data:
- return None
-
- return self._data[index]
-
-
-class TorchIterDataset(IterableDataset):
- """
- PyTorch IterableDataset implementation for reading Paimon table data.
-
- This class enables streaming data loading from Paimon tables, which is more
- memory-efficient for large datasets. Data is read on-the-fly as needed,
- rather than loading everything into memory upfront.
- """
-
- def __init__(self, table_read: TableRead, splits: List[Split]):
- """
- Initialize TorchIterDataset.
-
- Args:
- table_read: TableRead instance for reading data
- splits: List of splits to read
- """
- self.table_read = table_read
- self.splits = splits
- # Get field names from read_type
- self.field_names = [field.name for field in table_read.read_type]
-
- def __iter__(self):
- """
- Iterate over the dataset, converting each OffsetRow to a dictionary.
-
- Supports multi-worker data loading by partitioning splits across
workers.
- When num_workers > 0 in DataLoader, each worker will process a subset
of splits.
-
- Yields:
- row data of dict type, where keys are column names
- """
- worker_info = torch.utils.data.get_worker_info()
-
- if worker_info is None:
- # Single-process data loading, iterate over all splits
- splits_to_process = self.splits
- else:
- # Multi-process data loading, partition splits across workers
- worker_id = worker_info.id
- num_workers = worker_info.num_workers
-
- # Calculate start and end indices for this worker
- # Distribute splits evenly by slicing
- total_splits = len(self.splits)
- splits_per_worker = total_splits // num_workers
- remainder = total_splits % num_workers
-
- # Workers with id < remainder get one extra split
- if worker_id < remainder:
- start_idx = worker_id * (splits_per_worker + 1)
- end_idx = start_idx + splits_per_worker + 1
- else:
- start_idx = worker_id * splits_per_worker + remainder
- end_idx = start_idx + splits_per_worker
-
- splits_to_process = self.splits[start_idx:end_idx]
-
- worker_iterator = self.table_read.to_iterator(splits_to_process)
-
- for offset_row in worker_iterator:
- row_dict = {}
- for i, field_name in enumerate(self.field_names):
- value = offset_row.get_field(i)
- row_dict[field_name] = value
- yield row_dict
diff --git a/paimon-python/pypaimon/read/datasource/torch_dataset.py
b/paimon-python/pypaimon/read/datasource/torch_dataset.py
new file mode 100644
index 0000000000..a800295f9e
--- /dev/null
+++ b/paimon-python/pypaimon/read/datasource/torch_dataset.py
@@ -0,0 +1,142 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Module to read a Paimon table into PyTorch Dataset.
+"""
+from typing import List
+
+import torch
+from torch.utils.data import Dataset, IterableDataset
+
+from pypaimon.read.split import Split
+from pypaimon.read.table_read import TableRead
+
+
+class TorchDataset(Dataset):
+ """
+ PyTorch Dataset implementation for reading Paimon table data.
+
+ This class enables Paimon table data to be used directly with PyTorch's
+ training pipeline, allowing for efficient data loading and batching.
+ """
+
+ def __init__(self, table_read: TableRead, splits: List[Split]):
+ """
+ Initialize TorchDataset.
+
+ Args:
+ table_read: TableRead instance for reading data
+ splits: List of splits to read
+ """
+ arrow_table = table_read.to_arrow(splits)
+ if arrow_table is None or arrow_table.num_rows == 0:
+ self._data = []
+ else:
+ self._data = arrow_table.to_pylist()
+
+ def __len__(self) -> int:
+ """
+ Return the total number of rows in the dataset.
+
+ Returns:
+ Total number of rows across all splits
+ """
+ return len(self._data)
+
+ def __getitem__(self, index: int):
+ """
+ Get a single item from the dataset.
+
+ Args:
+ index: Index of the item to retrieve
+
+ Returns:
+ Dictionary containing the row data
+ """
+ if not self._data:
+ return None
+
+ return self._data[index]
+
+
+class TorchIterDataset(IterableDataset):
+ """
+ PyTorch IterableDataset implementation for reading Paimon table data.
+
+ This class enables streaming data loading from Paimon tables, which is more
+ memory-efficient for large datasets. Data is read on-the-fly as needed,
+ rather than loading everything into memory upfront.
+ """
+
+ def __init__(self, table_read: TableRead, splits: List[Split]):
+ """
+ Initialize TorchIterDataset.
+
+ Args:
+ table_read: TableRead instance for reading data
+ splits: List of splits to read
+ """
+ self.table_read = table_read
+ self.splits = splits
+ # Get field names from read_type
+ self.field_names = [field.name for field in table_read.read_type]
+
+ def __iter__(self):
+ """
+ Iterate over the dataset, converting each OffsetRow to a dictionary.
+
+ Supports multi-worker data loading by partitioning splits across
workers.
+ When num_workers > 0 in DataLoader, each worker will process a subset
of splits.
+
+ Yields:
+ row data of dict type, where keys are column names
+ """
+ worker_info = torch.utils.data.get_worker_info()
+
+ if worker_info is None:
+ # Single-process data loading, iterate over all splits
+ splits_to_process = self.splits
+ else:
+ # Multi-process data loading, partition splits across workers
+ worker_id = worker_info.id
+ num_workers = worker_info.num_workers
+
+ # Calculate start and end indices for this worker
+ # Distribute splits evenly by slicing
+ total_splits = len(self.splits)
+ splits_per_worker = total_splits // num_workers
+ remainder = total_splits % num_workers
+
+ # Workers with id < remainder get one extra split
+ if worker_id < remainder:
+ start_idx = worker_id * (splits_per_worker + 1)
+ end_idx = start_idx + splits_per_worker + 1
+ else:
+ start_idx = worker_id * splits_per_worker + remainder
+ end_idx = start_idx + splits_per_worker
+
+ splits_to_process = self.splits[start_idx:end_idx]
+
+ worker_iterator = self.table_read.to_iterator(splits_to_process)
+
+ for offset_row in worker_iterator:
+ row_dict = {}
+ for i, field_name in enumerate(self.field_names):
+ value = offset_row.get_field(i)
+ row_dict[field_name] = value
+ yield row_dict
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index f893f376ae..f546c4be6b 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -221,7 +221,7 @@ class TableRead:
if override_num_blocks is not None and override_num_blocks < 1:
raise ValueError(f"override_num_blocks must be at least 1, got
{override_num_blocks}")
- from pypaimon.read.datasource import RayDatasource
+ from pypaimon.read.datasource.ray_datasource import RayDatasource
datasource = RayDatasource(self, splits)
return ray.data.read_datasource(
datasource,
@@ -234,11 +234,11 @@ class TableRead:
def to_torch(self, splits: List[Split], streaming: bool = False) ->
"torch.utils.data.Dataset":
"""Wrap Paimon table data to PyTorch Dataset."""
if streaming:
- from pypaimon.read.datasource import TorchIterDataset
+ from pypaimon.read.datasource.torch_dataset import TorchIterDataset
dataset = TorchIterDataset(self, splits)
return dataset
else:
- from pypaimon.read.datasource import TorchDataset
+ from pypaimon.read.datasource.torch_dataset import TorchDataset
dataset = TorchDataset(self, splits)
return dataset
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 9aa2af4168..1b631a643c 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -50,7 +50,18 @@ setup(
packages=PACKAGES,
include_package_data=True,
install_requires=install_requires,
- extras_require={},
+ extras_require={
+ 'ray': [
+ 'ray>=2.10,<3; python_version>="3.7"',
+ ],
+ 'torch': [
+ 'torch',
+ ],
+ 'all': [
+ 'ray>=2.10,<3; python_version>="3.7"',
+ 'torch',
+ ],
+ },
description="Apache Paimon Python API",
long_description=long_description,
long_description_content_type="text/markdown",