rich7420 commented on code in PR #708:
URL: https://github.com/apache/mahout/pull/708#discussion_r2616353364
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// Uses batch size 2048 for optimal throughput on large row groups.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let total_rows = builder.metadata().file_metadata().num_rows() as
usize;
+
+ let reader = builder
+ .with_batch_size(2048) // Optimized for large row groups
+ .build()
+ .map_err(|e| {
+ MahoutError::Io(format!("Failed to build Parquet reader: {}",
e))
+ })?;
+
+ Ok(Self {
+ reader,
+ sample_size: None,
+ leftover_data: Vec::new(),
+ leftover_cursor: 0,
+ total_samples: total_rows,
+ })
+ }
+
+ /// Get the sample size (number of elements per sample)
+ pub fn get_sample_size(&self) -> Option<usize> {
+ self.sample_size
+ }
+
+ /// Read a chunk of data into the provided buffer
+ ///
+ /// Handles leftover data from previous reads and ensures sample
boundaries are respected.
+ /// Returns the number of elements written to the buffer.
+ pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ let mut written = 0;
+ let buf_cap = buffer.len();
+ let calc_limit = |ss: usize| -> usize {
+ if ss == 0 {
+ buf_cap
+ } else {
+ (buf_cap / ss) * ss
+ }
+ };
+ let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+ // Drain leftover data from previous read
+ if self.leftover_cursor < self.leftover_data.len() {
+ let available = self.leftover_data.len() - self.leftover_cursor;
+ let space_left = limit - written;
+ let to_copy = std::cmp::min(available, space_left);
+
+ if to_copy > 0 {
+ buffer[written..written+to_copy].copy_from_slice(
+
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+ );
+ written += to_copy;
+ self.leftover_cursor += to_copy;
+ if self.leftover_cursor == self.leftover_data.len() {
+ self.leftover_data.clear();
+ self.leftover_cursor = 0;
+ }
+ }
+ if written == limit {
+ return Ok(written);
+ }
+ }
+
+ // Read new batches from Parquet
+ while written < limit {
+ match self.reader.next() {
+ Some(Ok(batch)) => {
+ if batch.num_columns() == 0 {
+ continue;
+ }
+ let column = batch.column(0);
+
+ let list_array = column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
ListArray".to_string()))?;
+
+ if list_array.len() == 0 {
+ continue;
+ }
+
+ // Extract sample size from first element
+ let first_value = list_array.value(0);
+ let float_array = first_value
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must be
Float64".to_string()))?;
+
+ let current_sample_size = float_array.len();
+
+ if self.sample_size.is_none() {
+ self.sample_size = Some(current_sample_size);
+ limit = calc_limit(current_sample_size);
+ }
Review Comment:
sure, we should.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]