This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f08c561409 [python] light refactor datasource modules to decouple 
optional dependencies (#7086)
f08c561409 is described below

commit f08c561409a4349291ab163f99a7a09b8d1adfc1
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Jan 21 13:34:21 2026 +0800

    [python] light refactor datasource modules to decouple optional 
dependencies (#7086)
---
 paimon-python/dev/requirements.txt                 |   2 -
 paimon-python/pypaimon/read/datasource/__init__.py |  25 ++++
 .../ray_datasource.py}                             | 128 +------------------
 .../pypaimon/read/datasource/torch_dataset.py      | 142 +++++++++++++++++++++
 paimon-python/pypaimon/read/table_read.py          |   6 +-
 paimon-python/setup.py                             |  13 +-
 6 files changed, 183 insertions(+), 133 deletions(-)

diff --git a/paimon-python/dev/requirements.txt 
b/paimon-python/dev/requirements.txt
index 993cb46a25..aa922eca16 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -35,9 +35,7 @@ pyarrow>=16,<20; python_version >= "3.8"
 pylance>=0.20,<1; python_version>="3.9"
 pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"
 pyroaring
-ray>=2.10,<3; python_version>="3.7"
 readerwriterlock>=1,<2
-torch
 zstandard>=0.19,<1
 cramjam>=1.3.0,<3; python_version>="3.7"
 faiss-cpu==1.7.2; python_version >= "3.6" and python_version < "3.7"
diff --git a/paimon-python/pypaimon/read/datasource/__init__.py 
b/paimon-python/pypaimon/read/datasource/__init__.py
new file mode 100644
index 0000000000..b3a59c22f9
--- /dev/null
+++ b/paimon-python/pypaimon/read/datasource/__init__.py
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Datasource implementations for reading Paimon tables into various data 
processing frameworks.
+
+This package provides optional integrations with:
+- Ray: for distributed data processing (ray_datasource.py)
+- PyTorch: for machine learning training pipelines (torch_dataset.py)
+
+"""
diff --git a/paimon-python/pypaimon/read/datasource.py 
b/paimon-python/pypaimon/read/datasource/ray_datasource.py
similarity index 67%
rename from paimon-python/pypaimon/read/datasource.py
rename to paimon-python/pypaimon/read/datasource/ray_datasource.py
index 835effbf0b..7f78fbad6f 100644
--- a/paimon-python/pypaimon/read/datasource.py
+++ b/paimon-python/pypaimon/read/datasource/ray_datasource.py
@@ -27,7 +27,7 @@ from typing import List, Optional, Iterable
 import pyarrow
 from packaging.version import parse
 import ray
-import torch
+from ray.data.datasource import Datasource
 
 from pypaimon.read.split import Split
 from pypaimon.read.table_read import TableRead
@@ -39,10 +39,6 @@ logger = logging.getLogger(__name__)
 RAY_VERSION_SCHEMA_IN_READ_TASK = "2.48.0"  # Schema moved from BlockMetadata 
to ReadTask
 RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0"  # per_task_row_limit parameter 
introduced
 
-from ray.data.datasource import Datasource
-
-from torch.utils.data import Dataset, IterableDataset
-
 
 class RayDatasource(Datasource):
     """
@@ -73,7 +69,6 @@ class RayDatasource(Datasource):
         if not self.splits:
             return 0
 
-        # Sum up file sizes from all splits
         total_size = sum(split.file_size for split in self.splits)
         return total_size if total_size > 0 else None
 
@@ -110,15 +105,12 @@ class RayDatasource(Datasource):
 
         per_task_row_limit = kwargs.get('per_task_row_limit', None)
 
-        # Validate parallelism parameter
         if parallelism < 1:
             raise ValueError(f"parallelism must be at least 1, got 
{parallelism}")
 
-        # Get schema for metadata
         if self._schema is None:
             self._schema = 
PyarrowFieldParser.from_paimon_schema(self.table_read.read_type)
 
-        # Adjust parallelism if it exceeds the number of splits
         if parallelism > len(self.splits):
             parallelism = len(self.splits)
             logger.warning(
@@ -145,14 +137,11 @@ class RayDatasource(Datasource):
             from pypaimon.read.table_read import TableRead
             worker_table_read = TableRead(table, predicate, read_type)
 
-            # Read all splits in this chunk
             arrow_table = worker_table_read.to_arrow(splits)
 
-            # Return as a list to allow Ray to split into multiple blocks if 
needed
             if arrow_table is not None and arrow_table.num_rows > 0:
                 return [arrow_table]
             else:
-                # Return empty table with correct schema
                 empty_table = pyarrow.Table.from_arrays(
                     [pyarrow.array([], type=field.type) for field in schema],
                     schema=schema
@@ -229,118 +218,3 @@ class RayDatasource(Datasource):
             read_tasks.append(ReadTask(**read_task_kwargs))
 
         return read_tasks
-
-
-class TorchDataset(Dataset):
-    """
-    PyTorch Dataset implementation for reading Paimon table data.
-
-    This class enables Paimon table data to be used directly with PyTorch's
-    training pipeline, allowing for efficient data loading and batching.
-    """
-
-    def __init__(self, table_read: TableRead, splits: List[Split]):
-        """
-        Initialize TorchDataset.
-
-        Args:
-            table_read: TableRead instance for reading data
-            splits: List of splits to read
-        """
-        arrow_table = table_read.to_arrow(splits)
-        if arrow_table is None or arrow_table.num_rows == 0:
-            self._data = []
-        else:
-            self._data = arrow_table.to_pylist()
-
-    def __len__(self) -> int:
-        """
-        Return the total number of rows in the dataset.
-
-        Returns:
-            Total number of rows across all splits
-        """
-        return len(self._data)
-
-    def __getitem__(self, index: int):
-        """
-        Get a single item from the dataset.
-
-        Args:
-            index: Index of the item to retrieve
-
-        Returns:
-            Dictionary containing the row data
-        """
-        if not self._data:
-            return None
-
-        return self._data[index]
-
-
-class TorchIterDataset(IterableDataset):
-    """
-    PyTorch IterableDataset implementation for reading Paimon table data.
-
-    This class enables streaming data loading from Paimon tables, which is more
-    memory-efficient for large datasets. Data is read on-the-fly as needed,
-    rather than loading everything into memory upfront.
-    """
-
-    def __init__(self, table_read: TableRead, splits: List[Split]):
-        """
-        Initialize TorchIterDataset.
-
-        Args:
-            table_read: TableRead instance for reading data
-            splits: List of splits to read
-        """
-        self.table_read = table_read
-        self.splits = splits
-        # Get field names from read_type
-        self.field_names = [field.name for field in table_read.read_type]
-
-    def __iter__(self):
-        """
-        Iterate over the dataset, converting each OffsetRow to a dictionary.
-
-        Supports multi-worker data loading by partitioning splits across 
workers.
-        When num_workers > 0 in DataLoader, each worker will process a subset 
of splits.
-
-        Yields:
-            row data of dict type, where keys are column names
-        """
-        worker_info = torch.utils.data.get_worker_info()
-
-        if worker_info is None:
-            # Single-process data loading, iterate over all splits
-            splits_to_process = self.splits
-        else:
-            # Multi-process data loading, partition splits across workers
-            worker_id = worker_info.id
-            num_workers = worker_info.num_workers
-
-            # Calculate start and end indices for this worker
-            # Distribute splits evenly by slicing
-            total_splits = len(self.splits)
-            splits_per_worker = total_splits // num_workers
-            remainder = total_splits % num_workers
-
-            # Workers with id < remainder get one extra split
-            if worker_id < remainder:
-                start_idx = worker_id * (splits_per_worker + 1)
-                end_idx = start_idx + splits_per_worker + 1
-            else:
-                start_idx = worker_id * splits_per_worker + remainder
-                end_idx = start_idx + splits_per_worker
-
-            splits_to_process = self.splits[start_idx:end_idx]
-
-        worker_iterator = self.table_read.to_iterator(splits_to_process)
-
-        for offset_row in worker_iterator:
-            row_dict = {}
-            for i, field_name in enumerate(self.field_names):
-                value = offset_row.get_field(i)
-                row_dict[field_name] = value
-            yield row_dict
diff --git a/paimon-python/pypaimon/read/datasource/torch_dataset.py 
b/paimon-python/pypaimon/read/datasource/torch_dataset.py
new file mode 100644
index 0000000000..a800295f9e
--- /dev/null
+++ b/paimon-python/pypaimon/read/datasource/torch_dataset.py
@@ -0,0 +1,142 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Module to read a Paimon table into PyTorch Dataset.
+"""
+from typing import List
+
+import torch
+from torch.utils.data import Dataset, IterableDataset
+
+from pypaimon.read.split import Split
+from pypaimon.read.table_read import TableRead
+
+
+class TorchDataset(Dataset):
+    """
+    PyTorch Dataset implementation for reading Paimon table data.
+
+    This class enables Paimon table data to be used directly with PyTorch's
+    training pipeline, allowing for efficient data loading and batching.
+    """
+
+    def __init__(self, table_read: TableRead, splits: List[Split]):
+        """
+        Initialize TorchDataset.
+
+        Args:
+            table_read: TableRead instance for reading data
+            splits: List of splits to read
+        """
+        arrow_table = table_read.to_arrow(splits)
+        if arrow_table is None or arrow_table.num_rows == 0:
+            self._data = []
+        else:
+            self._data = arrow_table.to_pylist()
+
+    def __len__(self) -> int:
+        """
+        Return the total number of rows in the dataset.
+
+        Returns:
+            Total number of rows across all splits
+        """
+        return len(self._data)
+
+    def __getitem__(self, index: int):
+        """
+        Get a single item from the dataset.
+
+        Args:
+            index: Index of the item to retrieve
+
+        Returns:
+            Dictionary containing the row data
+        """
+        if not self._data:
+            return None
+
+        return self._data[index]
+
+
+class TorchIterDataset(IterableDataset):
+    """
+    PyTorch IterableDataset implementation for reading Paimon table data.
+
+    This class enables streaming data loading from Paimon tables, which is more
+    memory-efficient for large datasets. Data is read on-the-fly as needed,
+    rather than loading everything into memory upfront.
+    """
+
+    def __init__(self, table_read: TableRead, splits: List[Split]):
+        """
+        Initialize TorchIterDataset.
+
+        Args:
+            table_read: TableRead instance for reading data
+            splits: List of splits to read
+        """
+        self.table_read = table_read
+        self.splits = splits
+        # Get field names from read_type
+        self.field_names = [field.name for field in table_read.read_type]
+
+    def __iter__(self):
+        """
+        Iterate over the dataset, converting each OffsetRow to a dictionary.
+
+        Supports multi-worker data loading by partitioning splits across 
workers.
+        When num_workers > 0 in DataLoader, each worker will process a subset 
of splits.
+
+        Yields:
+            row data of dict type, where keys are column names
+        """
+        worker_info = torch.utils.data.get_worker_info()
+
+        if worker_info is None:
+            # Single-process data loading, iterate over all splits
+            splits_to_process = self.splits
+        else:
+            # Multi-process data loading, partition splits across workers
+            worker_id = worker_info.id
+            num_workers = worker_info.num_workers
+
+            # Calculate start and end indices for this worker
+            # Distribute splits evenly by slicing
+            total_splits = len(self.splits)
+            splits_per_worker = total_splits // num_workers
+            remainder = total_splits % num_workers
+
+            # Workers with id < remainder get one extra split
+            if worker_id < remainder:
+                start_idx = worker_id * (splits_per_worker + 1)
+                end_idx = start_idx + splits_per_worker + 1
+            else:
+                start_idx = worker_id * splits_per_worker + remainder
+                end_idx = start_idx + splits_per_worker
+
+            splits_to_process = self.splits[start_idx:end_idx]
+
+        worker_iterator = self.table_read.to_iterator(splits_to_process)
+
+        for offset_row in worker_iterator:
+            row_dict = {}
+            for i, field_name in enumerate(self.field_names):
+                value = offset_row.get_field(i)
+                row_dict[field_name] = value
+            yield row_dict
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index f893f376ae..f546c4be6b 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -221,7 +221,7 @@ class TableRead:
         if override_num_blocks is not None and override_num_blocks < 1:
             raise ValueError(f"override_num_blocks must be at least 1, got 
{override_num_blocks}")
 
-        from pypaimon.read.datasource import RayDatasource
+        from pypaimon.read.datasource.ray_datasource import RayDatasource
         datasource = RayDatasource(self, splits)
         return ray.data.read_datasource(
             datasource,
@@ -234,11 +234,11 @@ class TableRead:
     def to_torch(self, splits: List[Split], streaming: bool = False) -> 
"torch.utils.data.Dataset":
         """Wrap Paimon table data to PyTorch Dataset."""
         if streaming:
-            from pypaimon.read.datasource import TorchIterDataset
+            from pypaimon.read.datasource.torch_dataset import TorchIterDataset
             dataset = TorchIterDataset(self, splits)
             return dataset
         else:
-            from pypaimon.read.datasource import TorchDataset
+            from pypaimon.read.datasource.torch_dataset import TorchDataset
             dataset = TorchDataset(self, splits)
             return dataset
 
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 9aa2af4168..1b631a643c 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -50,7 +50,18 @@ setup(
     packages=PACKAGES,
     include_package_data=True,
     install_requires=install_requires,
-    extras_require={},
+    extras_require={
+        'ray': [
+            'ray>=2.10,<3; python_version>="3.7"',
+        ],
+        'torch': [
+            'torch',
+        ],
+        'all': [
+            'ray>=2.10,<3; python_version>="3.7"',
+            'torch',
+        ],
+    },
     description="Apache Paimon Python API",
     long_description=long_description,
     long_description_content_type="text/markdown",

Reply via email to