machichima commented on code in PR #708:
URL: https://github.com/apache/mahout/pull/708#discussion_r2616147157
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
Review Comment:
nit: the `total_samples` name is a bit confusing as we also have
`sample_size`. Could we use `total_rows`?
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// Uses batch size 2048 for optimal throughput on large row groups.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let total_rows = builder.metadata().file_metadata().num_rows() as
usize;
+
+ let reader = builder
+ .with_batch_size(2048) // Optimized for large row groups
Review Comment:
nit: do we want to pass the `batch_size` from the args to make it
configurable?
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
Review Comment:
Just curious, are we planning to support `FixedSizeList` as what we have in
`read_arrow_ipc_batch`?
https://github.com/apache/mahout/blob/b17c3f6c9dfe8e23f7c276268b13baedf62504f3/qdp/qdp-core/src/io.rs#L354
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// Uses batch size 2048 for optimal throughput on large row groups.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
Review Comment:
If I understand it correctly, this `ParquestBlockReader` only supports
`List<Float64>` column type? If so, I think we can add a schema validation in
`new()` to validate the `builder.schema()` is as expected, and we can fail
early here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]