TheR1sing3un commented on code in PR #7740:
URL: https://github.com/apache/paimon/pull/7740#discussion_r3166793802
##########
paimon-python/pypaimon/read/datasource/ray_datasource.py:
##########
@@ -40,36 +39,163 @@
RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0" # per_task_row_limit parameter
introduced
+def _paimon_read_task(splits, table, predicate, read_type, schema):
+ """Module-level read function that yields Arrow tables per batch.
+
+ Using a generator avoids loading every split's rows into memory at once —
+ memory usage stays proportional to one batch rather than the whole chunk.
+ """
+ from pypaimon.read.table_read import TableRead
+ worker_table_read = TableRead(table, predicate, read_type)
+ batch_reader = worker_table_read.to_arrow_batch_reader(splits)
+
+ has_data = False
+ for batch in iter(batch_reader.read_next_batch, None):
+ if batch.num_rows == 0:
+ continue
+ has_data = True
+ yield pyarrow.Table.from_batches([batch], schema=schema)
+
+ if not has_data:
+ yield pyarrow.Table.from_arrays(
+ [pyarrow.array([], type=field.type) for field in schema],
+ schema=schema,
+ )
+
+
class RayDatasource(Datasource):
"""
Ray Data Datasource implementation for reading Paimon tables.
- This datasource enables distributed parallel reading of Paimon table
splits,
- allowing Ray to read multiple splits concurrently across the cluster.
+ Self-contained: only requires a fully-qualified table identifier and the
+ catalog options. The catalog, table, and splits are loaded lazily so the
+ datasource is cheap to instantiate and easy to serialize across Ray workers
+ (mirrors Iceberg's ``IcebergDatasource``).
"""
- def __init__(self, table_read: TableRead, splits: List[Split]):
+ def __init__(
+ self,
+ table_identifier: Optional[str] = None,
+ catalog_options: Optional[Dict[str, str]] = None,
Review Comment:
> Perhaps we should do some abstraction, such as providing a `SplitProvider`
to obtain splits.
Do you mean to extract the logic of how to find the corresponding read
splits through identifiers, predicates, limits, and projections into a separate
method? To achieve possible reuse in the future? For example, the integration
of the high level into the daft engine
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]