TheR1sing3un commented on code in PR #7740:
URL: https://github.com/apache/paimon/pull/7740#discussion_r3176863716


##########
paimon-python/pypaimon/read/datasource/ray_datasource.py:
##########
@@ -40,36 +39,133 @@
 RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0"  # per_task_row_limit parameter 
introduced
 
 
+def _paimon_read_task(splits, table, predicate, read_type, schema):
+    """Module-level read function that yields Arrow tables per batch.
+
+    Using a generator avoids loading every split's rows into memory at once —
+    memory usage stays proportional to one batch rather than the whole chunk.
+    """
+    from pypaimon.read.table_read import TableRead
+    worker_table_read = TableRead(table, predicate, read_type)
+    batch_reader = worker_table_read.to_arrow_batch_reader(splits)
+
+    has_data = False
+    for batch in iter(batch_reader.read_next_batch, None):
+        if batch.num_rows == 0:
+            continue
+        has_data = True
+        yield pyarrow.Table.from_batches([batch], schema=schema)
+
+    if not has_data:
+        yield pyarrow.Table.from_arrays(
+            [pyarrow.array([], type=field.type) for field in schema],
+            schema=schema,
+        )
+
+
 class RayDatasource(Datasource):
     """
     Ray Data Datasource implementation for reading Paimon tables.
 
-    This datasource enables distributed parallel reading of Paimon table 
splits,
-    allowing Ray to read multiple splits concurrently across the cluster.
+    Self-contained: only requires a fully-qualified table identifier and the
+    catalog options. The catalog, table, and splits are loaded lazily so the
+    datasource is cheap to instantiate and easy to serialize across Ray workers
+    (mirrors Iceberg's ``IcebergDatasource``).
     """
 
-    def __init__(self, table_read: TableRead, splits: List[Split]):
+    def __init__(
+        self,
+        table_identifier: str,
+        catalog_options: Dict[str, str],
+        predicate=None,
+        projection: Optional[List[str]] = None,
+        limit: Optional[int] = None,
+    ):
         """
-        Initialize PaimonDatasource.
+        Initialize RayDatasource.
 
         Args:
-            table_read: TableRead instance for reading data
-            splits: List of splits to read
+            table_identifier: Fully qualified table name, e.g. ``"db.table"``.
+            catalog_options: Options passed to ``CatalogFactory.create()``.
+            predicate: Optional ``Predicate`` for scan-time filtering.
+            projection: Optional list of column names to read.
+            limit: Optional row limit for the scan.
         """
-        self.table_read = table_read
-        self.splits = splits
+        self.table_identifier = table_identifier
+        self.catalog_options = catalog_options
+        self.predicate = predicate
+        self.projection = projection
+        self.limit = limit
+        self._table = None
+        self._splits = None
+        self._read_type = None
         self._schema = None
 
+    @property
+    def table(self):
+        """Lazily load the table from the catalog."""
+        if self._table is None:
+            from pypaimon.catalog.catalog_factory import CatalogFactory
+            catalog = CatalogFactory.create(self.catalog_options)
+            self._table = catalog.get_table(self.table_identifier)
+        return self._table
+
+    @property
+    def splits(self):
+        """Lazily plan splits from the table."""
+        if self._splits is None:
+            self._plan()
+        return self._splits
+
+    @property
+    def read_type(self):
+        """Lazily resolve the scan read type from filter / projection / 
limit."""
+        if self._read_type is None:
+            self._plan()
+        return self._read_type
+
+    def _plan(self):

Review Comment:
   > add a _ensure_planned() for this.
   
   done



##########
paimon-python/pypaimon/read/datasource/ray_datasource.py:
##########
@@ -40,36 +39,133 @@
 RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0"  # per_task_row_limit parameter 
introduced
 
 
+def _paimon_read_task(splits, table, predicate, read_type, schema):
+    """Module-level read function that yields Arrow tables per batch.
+
+    Using a generator avoids loading every split's rows into memory at once —
+    memory usage stays proportional to one batch rather than the whole chunk.
+    """
+    from pypaimon.read.table_read import TableRead
+    worker_table_read = TableRead(table, predicate, read_type)
+    batch_reader = worker_table_read.to_arrow_batch_reader(splits)
+
+    has_data = False
+    for batch in iter(batch_reader.read_next_batch, None):
+        if batch.num_rows == 0:
+            continue
+        has_data = True
+        yield pyarrow.Table.from_batches([batch], schema=schema)
+
+    if not has_data:
+        yield pyarrow.Table.from_arrays(
+            [pyarrow.array([], type=field.type) for field in schema],
+            schema=schema,
+        )
+
+
 class RayDatasource(Datasource):
     """
     Ray Data Datasource implementation for reading Paimon tables.
 
-    This datasource enables distributed parallel reading of Paimon table 
splits,
-    allowing Ray to read multiple splits concurrently across the cluster.
+    Self-contained: only requires a fully-qualified table identifier and the
+    catalog options. The catalog, table, and splits are loaded lazily so the
+    datasource is cheap to instantiate and easy to serialize across Ray workers
+    (mirrors Iceberg's ``IcebergDatasource``).
     """
 
-    def __init__(self, table_read: TableRead, splits: List[Split]):
+    def __init__(
+        self,
+        table_identifier: str,
+        catalog_options: Dict[str, str],
+        predicate=None,
+        projection: Optional[List[str]] = None,
+        limit: Optional[int] = None,
+    ):
         """
-        Initialize PaimonDatasource.
+        Initialize RayDatasource.
 
         Args:
-            table_read: TableRead instance for reading data
-            splits: List of splits to read
+            table_identifier: Fully qualified table name, e.g. ``"db.table"``.
+            catalog_options: Options passed to ``CatalogFactory.create()``.
+            predicate: Optional ``Predicate`` for scan-time filtering.
+            projection: Optional list of column names to read.
+            limit: Optional row limit for the scan.
         """
-        self.table_read = table_read
-        self.splits = splits
+        self.table_identifier = table_identifier
+        self.catalog_options = catalog_options
+        self.predicate = predicate
+        self.projection = projection
+        self.limit = limit
+        self._table = None
+        self._splits = None
+        self._read_type = None
         self._schema = None
 
+    @property
+    def table(self):
+        """Lazily load the table from the catalog."""
+        if self._table is None:
+            from pypaimon.catalog.catalog_factory import CatalogFactory
+            catalog = CatalogFactory.create(self.catalog_options)
+            self._table = catalog.get_table(self.table_identifier)
+        return self._table
+
+    @property
+    def splits(self):
+        """Lazily plan splits from the table."""
+        if self._splits is None:
+            self._plan()
+        return self._splits
+
+    @property
+    def read_type(self):
+        """Lazily resolve the scan read type from filter / projection / 
limit."""
+        if self._read_type is None:
+            self._plan()
+        return self._read_type
+
+    def _plan(self):
+        from pypaimon.read.read_builder import ReadBuilder
+        rb = ReadBuilder(self.table)
+        if self.predicate is not None:
+            rb = rb.with_filter(self.predicate)
+        if self.projection is not None:
+            rb = rb.with_projection(self.projection)
+        if self.limit is not None:
+            rb = rb.with_limit(self.limit)
+        self._read_type = rb.read_type()
+        self._splits = rb.new_scan().plan().splits()
+
+    @classmethod
+    def _from_table_read(cls, table_read, splits):
+        """Bridge for ``TableRead.to_ray()`` — wraps an already-resolved
+        ``(table_read, splits)`` pair without going back through the catalog.
+        """
+        ds = cls.__new__(cls)

Review Comment:
   > A safer pattern is to call **init** with sentinel values or use a flag 
parameter.
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to