machichima commented on code in PR #708:
URL: https://github.com/apache/mahout/pull/708#discussion_r2616266358


##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };

Review Comment:
   Please correct me if I'm wrong. I think this part is to ensure we will read 
the full sample for each data row? In this case I think we need to handle the 
case when we first calling this function? (`self.sample_size.is_none()`)
   
   We didn't update the limit when we get the actual `self.sample_size`, which 
we may get incomplete sample



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        // Drain leftover data from previous read
+        if self.leftover_cursor < self.leftover_data.len() {
+            let available = self.leftover_data.len() - self.leftover_cursor;
+            let space_left = limit - written;
+            let to_copy = std::cmp::min(available, space_left);
+
+            if to_copy > 0 {
+                buffer[written..written+to_copy].copy_from_slice(
+                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                );
+                written += to_copy;
+                self.leftover_cursor += to_copy;
+                if self.leftover_cursor == self.leftover_data.len() {
+                    self.leftover_data.clear();
+                    self.leftover_cursor = 0;
+                }
+            }
+            if written == limit {
+                return Ok(written);
+            }
+        }
+
+        // Read new batches from Parquet
+        while written < limit {
+            match self.reader.next() {
+                Some(Ok(batch)) => {
+                    if batch.num_columns() == 0 {
+                        continue;
+                    }
+                    let column = batch.column(0);
+
+                    let list_array = column
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+                    if list_array.len() == 0 {
+                        continue;
+                    }
+
+                    // Extract sample size from first element
+                    let first_value = list_array.value(0);
+                    let float_array = first_value
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                    let current_sample_size = float_array.len();
+
+                    if self.sample_size.is_none() {
+                        self.sample_size = Some(current_sample_size);
+                        limit = calc_limit(current_sample_size);
+                    }

Review Comment:
   nit: I think we can do this in the following for loop with `if i == 0 
{....}`? So that we do not need to get the first value twice with duplicate 
logic



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        // Drain leftover data from previous read
+        if self.leftover_cursor < self.leftover_data.len() {
+            let available = self.leftover_data.len() - self.leftover_cursor;
+            let space_left = limit - written;
+            let to_copy = std::cmp::min(available, space_left);
+
+            if to_copy > 0 {
+                buffer[written..written+to_copy].copy_from_slice(
+                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                );
+                written += to_copy;
+                self.leftover_cursor += to_copy;
+                if self.leftover_cursor == self.leftover_data.len() {
+                    self.leftover_data.clear();
+                    self.leftover_cursor = 0;
+                }
+            }
+            if written == limit {
+                return Ok(written);
+            }
+        }
+
+        // Read new batches from Parquet
+        while written < limit {
+            match self.reader.next() {
+                Some(Ok(batch)) => {
+                    if batch.num_columns() == 0 {
+                        continue;
+                    }
+                    let column = batch.column(0);
+
+                    let list_array = column
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+                    if list_array.len() == 0 {
+                        continue;
+                    }
+
+                    // Extract sample size from first element
+                    let first_value = list_array.value(0);
+                    let float_array = first_value
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                    let current_sample_size = float_array.len();
+
+                    if self.sample_size.is_none() {
+                        self.sample_size = Some(current_sample_size);
+                        limit = calc_limit(current_sample_size);
+                    }
+
+                    // Extract all values from this batch
+                    let mut batch_values = Vec::new();
+                    for i in 0..list_array.len() {
+                        let value_array = list_array.value(i);
+                        let float_array = value_array
+                            .as_any()
+                            .downcast_ref::<Float64Array>()
+                            .ok_or_else(|| MahoutError::Io("List values must 
be Float64".to_string()))?;
+
+                        if float_array.null_count() == 0 {
+                            
batch_values.extend_from_slice(float_array.values());
+                        } else {
+                            return Err(MahoutError::Io("Null value encountered 
in Float64Array during quantum encoding. Please check data quality at the 
source.".to_string()));
+                        }
+                    }
+
+                    let available = batch_values.len();
+                    let space_left = if written >= limit { 0 } else { limit - 
written };

Review Comment:
   I think `written >= limit` will never happen? As we did not modify `written` 
before this line in this loop



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        // Drain leftover data from previous read
+        if self.leftover_cursor < self.leftover_data.len() {
+            let available = self.leftover_data.len() - self.leftover_cursor;
+            let space_left = limit - written;
+            let to_copy = std::cmp::min(available, space_left);
+
+            if to_copy > 0 {
+                buffer[written..written+to_copy].copy_from_slice(
+                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                );
+                written += to_copy;
+                self.leftover_cursor += to_copy;
+                if self.leftover_cursor == self.leftover_data.len() {
+                    self.leftover_data.clear();
+                    self.leftover_cursor = 0;
+                }
+            }
+            if written == limit {
+                return Ok(written);
+            }
+        }

Review Comment:
   Will it be possible that after executing this block, we will have 
`self.leftover_cursor != self.leftover_data.len()`? Do we need to make it a 
loop to ensure all leftover_data are processed?



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +158,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            // Send first chunk directly to GPU loop (must be processed first)
+            full_buf_tx.send((host_buf_first, first_len))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            // Send one empty buffer to IO thread for subsequent reads
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            // Spawn IO thread (Producer): continues reading from second chunk 
onwards
+            let mut reader = reader_core;
+            let io_handle = thread::spawn(move || {
+                loop {
+                    let mut buffer = match empty_buf_rx.recv() {
+                        Ok(b) => b,
+                        Err(_) => break,
+                    };
+
+                    let len = match reader.read_chunk(buffer.as_slice_mut()) {
+                        Ok(l) => l,
+                        Err(e) => { eprintln!("IO Error: {:?}", e); 0 }

Review Comment:
   I think it's better to raise error rather than only printing, so that user 
are able to capture the IO error



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +158,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()

Review Comment:
   nit: if sample_size == 0, do we need to perform following logics?



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        // Drain leftover data from previous read
+        if self.leftover_cursor < self.leftover_data.len() {
+            let available = self.leftover_data.len() - self.leftover_cursor;
+            let space_left = limit - written;
+            let to_copy = std::cmp::min(available, space_left);
+
+            if to_copy > 0 {
+                buffer[written..written+to_copy].copy_from_slice(
+                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                );
+                written += to_copy;
+                self.leftover_cursor += to_copy;
+                if self.leftover_cursor == self.leftover_data.len() {
+                    self.leftover_data.clear();
+                    self.leftover_cursor = 0;
+                }
+            }
+            if written == limit {
+                return Ok(written);
+            }
+        }
+
+        // Read new batches from Parquet
+        while written < limit {
+            match self.reader.next() {
+                Some(Ok(batch)) => {
+                    if batch.num_columns() == 0 {
+                        continue;
+                    }
+                    let column = batch.column(0);
+
+                    let list_array = column
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+                    if list_array.len() == 0 {
+                        continue;
+                    }
+
+                    // Extract sample size from first element
+                    let first_value = list_array.value(0);
+                    let float_array = first_value
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                    let current_sample_size = float_array.len();
+
+                    if self.sample_size.is_none() {

Review Comment:
   If `sample_size` is not `None`, do we need to check if the 
`current_sample_size` is same as `self.sample_size`? Are we forcing all 
`sample_size` should be the same?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to