Copilot commented on code in PR #708:
URL: https://github.com/apache/mahout/pull/708#discussion_r2613122645


##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            // Send first chunk directly to GPU loop (must be processed first)
+            full_buf_tx.send((host_buf_first, first_len))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            // Send one empty buffer to IO thread for subsequent reads
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            // Spawn IO thread (Producer): continues reading from second chunk 
onwards
+            let mut reader = reader_core;
+            let io_handle = thread::spawn(move || {
+                loop {
+                    let mut buffer = match empty_buf_rx.recv() {
+                        Ok(b) => b,
+                        Err(_) => break,
+                    };
+
+                    let len = match reader.read_chunk(buffer.as_slice_mut()) {
+                        Ok(l) => l,
+                        Err(e) => { eprintln!("IO Error: {:?}", e); 0 }

Review Comment:
   The error from the IO thread is printed to stderr but processing continues. 
If the IO thread encounters an error, the GPU loop may hang indefinitely 
waiting for data that will never arrive. Consider propagating the error through 
the channel or setting a flag that the main thread can check, so the GPU loop 
can exit gracefully with an appropriate error message.



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            // Send first chunk directly to GPU loop (must be processed first)
+            full_buf_tx.send((host_buf_first, first_len))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            // Send one empty buffer to IO thread for subsequent reads
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            // Spawn IO thread (Producer): continues reading from second chunk 
onwards
+            let mut reader = reader_core;
+            let io_handle = thread::spawn(move || {
+                loop {
+                    let mut buffer = match empty_buf_rx.recv() {
+                        Ok(b) => b,
+                        Err(_) => break,
+                    };
+
+                    let len = match reader.read_chunk(buffer.as_slice_mut()) {
+                        Ok(l) => l,
+                        Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+                    };
+
+                    if full_buf_tx.send((buffer, len)).is_err() { break; }
+                    if len == 0 { break; }
+                }
+            });
+
+            // GPU processing loop: receives pre-read chunk, then IO thread 
chunks
+            let mut global_sample_offset = 0;
+            let mut use_dev_a = true;
+            let state_len_per_sample = 1 << num_qubits;
+
+            loop {
+                let (host_buffer, current_len) = full_buf_rx.recv()
+                    .map_err(|_| MahoutError::Io("IO thread 
disconnected".into()))?;
+
+                // len == 0 means IO thread finished (don't recycle buffer)
+                if current_len == 0 { break; }
+
+                let samples_in_chunk = current_len / sample_size;
+                if samples_in_chunk > 0 {
+                    let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else 
{ *dev_in_b.device_ptr() };
+
+                    unsafe {
+                        crate::profile_scope!("GPU::Dispatch");
+
+                        // Async H2D copy → record event → wait for copy → 
launch kernel
+                        ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut 
c_void, current_len);
+                        ctx.record_copy_done();
+                        ctx.wait_for_copy();
+
+                        // Compute norms and encode batch
+                        {
+                            crate::profile_scope!("GPU::BatchEncode");
+                            let offset_elements = global_sample_offset * 
state_len_per_sample;
+                            let state_ptr_offset = 
total_state_vector.ptr().cast::<u8>()
+                                .add(offset_elements * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+                                .cast::<std::ffi::c_void>();
+
+                            // Allocate norm buffer for this chunk
+                            let mut norm_buffer = 
self.device.alloc_zeros::<f64>(samples_in_chunk)
+                                .map_err(|e| 
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", 
e)))?;
+
+                            // Step 1: Compute L2 norms for this chunk
+                            {
+                                crate::profile_scope!("GPU::NormBatch");
+                                let ret = launch_l2_norm_batch(
+                                    dev_ptr as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    *norm_buffer.device_ptr_mut() as *mut f64,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+                                }
+                            }
+
+                            // Step 2: Encode batch using computed norms
+                            {
+                                crate::profile_scope!("GPU::EncodeBatch");
+                                let ret = launch_amplitude_encode_batch(
+                                    dev_ptr as *const f64,
+                                    state_ptr_offset,
+                                    *norm_buffer.device_ptr() as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    state_len_per_sample,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+                                }
+                            }
+                        }
+
+                        // Sync copy stream before buffer reuse
+                        ctx.sync_copy_stream();
+                    }
+                    global_sample_offset += samples_in_chunk;
+                    use_dev_a = !use_dev_a;
+                }
+
+                // Return buffer to IO thread (ignore errors if thread exited)
+                let _ = empty_buf_tx.send(host_buffer);
+            }
+
+            self.device.synchronize().map_err(|e| 
MahoutError::Cuda(format!("{:?}", e)))?;
+            let _ = io_handle.join();
+
+            // Transfer ownership to DLPack (Arc handles ref counting)
+            let dlpack_ptr = total_state_vector.to_dlpack();
+            Ok(dlpack_ptr)
+        }
+
+        #[cfg(not(target_os = "linux"))]
+        {
+            let (batch_data, num_samples, sample_size) = 
crate::io::read_parquet_batch(path)?;
+            self.encode_batch(&batch_data, num_samples, sample_size, 
num_qubits, encoding_method)
+        }
     }

Review Comment:
   The new streaming pipeline in encode_from_parquet lacks test coverage. This 
is a complex producer-consumer implementation with multi-threading, CUDA 
streams, and error handling that should be thoroughly tested. Consider adding 
tests for: empty files, single-chunk files, multi-chunk files, error scenarios 
in IO thread, and correctness of the encoded output compared to the 
non-streaming implementation.



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            // Send first chunk directly to GPU loop (must be processed first)
+            full_buf_tx.send((host_buf_first, first_len))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            // Send one empty buffer to IO thread for subsequent reads
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            // Spawn IO thread (Producer): continues reading from second chunk 
onwards
+            let mut reader = reader_core;
+            let io_handle = thread::spawn(move || {
+                loop {
+                    let mut buffer = match empty_buf_rx.recv() {
+                        Ok(b) => b,
+                        Err(_) => break,
+                    };
+
+                    let len = match reader.read_chunk(buffer.as_slice_mut()) {
+                        Ok(l) => l,
+                        Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+                    };
+
+                    if full_buf_tx.send((buffer, len)).is_err() { break; }
+                    if len == 0 { break; }
+                }
+            });
+
+            // GPU processing loop: receives pre-read chunk, then IO thread 
chunks
+            let mut global_sample_offset = 0;
+            let mut use_dev_a = true;
+            let state_len_per_sample = 1 << num_qubits;
+
+            loop {
+                let (host_buffer, current_len) = full_buf_rx.recv()
+                    .map_err(|_| MahoutError::Io("IO thread 
disconnected".into()))?;
+
+                // len == 0 means IO thread finished (don't recycle buffer)
+                if current_len == 0 { break; }
+
+                let samples_in_chunk = current_len / sample_size;
+                if samples_in_chunk > 0 {
+                    let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else 
{ *dev_in_b.device_ptr() };
+
+                    unsafe {
+                        crate::profile_scope!("GPU::Dispatch");
+
+                        // Async H2D copy → record event → wait for copy → 
launch kernel
+                        ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut 
c_void, current_len);
+                        ctx.record_copy_done();
+                        ctx.wait_for_copy();
+
+                        // Compute norms and encode batch
+                        {
+                            crate::profile_scope!("GPU::BatchEncode");
+                            let offset_elements = global_sample_offset * 
state_len_per_sample;
+                            let state_ptr_offset = 
total_state_vector.ptr().cast::<u8>()
+                                .add(offset_elements * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+                                .cast::<std::ffi::c_void>();
+
+                            // Allocate norm buffer for this chunk
+                            let mut norm_buffer = 
self.device.alloc_zeros::<f64>(samples_in_chunk)
+                                .map_err(|e| 
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", 
e)))?;

Review Comment:
   The norm buffer allocation happens inside the processing loop for each 
chunk. This means the buffer is allocated and deallocated repeatedly for every 
chunk processed. Consider allocating the norm buffer once outside the loop with 
a size of STAGE_SIZE_ELEMENTS / sample_size to avoid repeated allocations which 
can fragment memory and reduce performance.



##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -24,7 +24,104 @@ use std::ffi::c_void;
 use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
 use crate::error::{MahoutError, Result};
 #[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error, 
PinnedBuffer};
+
+// CUDA FFI: centralized here to keep lib.rs clean
+#[cfg(target_os = "linux")]
+unsafe extern "C" {
+    fn cudaMemcpyAsync(dst: *mut c_void, src: *const c_void, count: usize, 
kind: u32, stream: *mut c_void) -> i32;
+    fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32;
+    fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32;
+    fn cudaEventDestroy(event: *mut c_void) -> i32;
+    fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, flags: 
u32) -> i32;
+    fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+}
+
+#[cfg(target_os = "linux")]
+const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+#[cfg(target_os = "linux")]
+const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+    pub stream_compute: CudaStream,
+    pub stream_copy: CudaStream,
+    event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+    /// Create dual streams and sync event
+    pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+        let stream_compute = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+        let stream_copy = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+        let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+        unsafe {
+            let ret = cudaEventCreateWithFlags(&mut event_copy_done, 
CUDA_EVENT_DISABLE_TIMING);
+            if ret != 0 {
+                return Err(MahoutError::Cuda(format!("Failed to create CUDA 
event: {}", ret)));
+            }
+        }
+
+        Ok(Self {
+            stream_compute,
+            stream_copy,
+            event_copy_done,
+        })
+    }
+
+    /// Async H2D copy on copy stream
+    pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut 
c_void, len_elements: usize) {
+        crate::profile_scope!("GPU::H2D_Copy");
+        unsafe {
+            cudaMemcpyAsync(
+                dst,
+                src.ptr() as *const c_void,
+                len_elements * std::mem::size_of::<f64>(),
+                CUDA_MEMCPY_HOST_TO_DEVICE,
+                self.stream_copy.stream as *mut c_void
+            );
+        }

Review Comment:
   The cudaMemcpyAsync return value is not checked. If the memory copy fails, 
this will silently continue and potentially cause downstream errors that are 
harder to diagnose. Add error checking for the return value and return an 
appropriate error to ensure failures are caught early.
   ```suggestion
       pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut 
c_void, len_elements: usize) -> Result<()> {
           crate::profile_scope!("GPU::H2D_Copy");
           unsafe {
               let ret = cudaMemcpyAsync(
                   dst,
                   src.ptr() as *const c_void,
                   len_elements * std::mem::size_of::<f64>(),
                   CUDA_MEMCPY_HOST_TO_DEVICE,
                   self.stream_copy.stream as *mut c_void
               );
               if ret != 0 {
                   return Err(MahoutError::Cuda(format!("cudaMemcpyAsync failed 
with error code: {}", ret)));
               }
           }
           Ok(())
   ```



##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -24,7 +24,104 @@ use std::ffi::c_void;
 use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
 use crate::error::{MahoutError, Result};
 #[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error, 
PinnedBuffer};
+
+// CUDA FFI: centralized here to keep lib.rs clean
+#[cfg(target_os = "linux")]
+unsafe extern "C" {
+    fn cudaMemcpyAsync(dst: *mut c_void, src: *const c_void, count: usize, 
kind: u32, stream: *mut c_void) -> i32;
+    fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32;
+    fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32;
+    fn cudaEventDestroy(event: *mut c_void) -> i32;
+    fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, flags: 
u32) -> i32;
+    fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+}
+
+#[cfg(target_os = "linux")]
+const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+#[cfg(target_os = "linux")]
+const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+    pub stream_compute: CudaStream,
+    pub stream_copy: CudaStream,
+    event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+    /// Create dual streams and sync event
+    pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+        let stream_compute = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+        let stream_copy = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+        let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+        unsafe {
+            let ret = cudaEventCreateWithFlags(&mut event_copy_done, 
CUDA_EVENT_DISABLE_TIMING);
+            if ret != 0 {
+                return Err(MahoutError::Cuda(format!("Failed to create CUDA 
event: {}", ret)));
+            }
+        }
+
+        Ok(Self {
+            stream_compute,
+            stream_copy,
+            event_copy_done,
+        })
+    }
+
+    /// Async H2D copy on copy stream
+    pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut 
c_void, len_elements: usize) {
+        crate::profile_scope!("GPU::H2D_Copy");
+        unsafe {
+            cudaMemcpyAsync(
+                dst,
+                src.ptr() as *const c_void,
+                len_elements * std::mem::size_of::<f64>(),
+                CUDA_MEMCPY_HOST_TO_DEVICE,
+                self.stream_copy.stream as *mut c_void
+            );
+        }
+    }
+
+    /// Record copy completion event
+    pub unsafe fn record_copy_done(&self) {
+        unsafe {
+            cudaEventRecord(self.event_copy_done, self.stream_copy.stream as 
*mut c_void);
+        }
+    }
+
+    /// Make compute stream wait for copy completion
+    pub unsafe fn wait_for_copy(&self) {
+        crate::profile_scope!("GPU::StreamWait");
+        unsafe {
+            cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void, 
self.event_copy_done, 0);
+        }

Review Comment:
   The cudaEventRecord and cudaStreamWaitEvent calls at lines 94 and 102 don't 
check their return values. If these operations fail, the stream synchronization 
may not work correctly, leading to race conditions where the compute stream 
starts before data transfer completes. Add error checking and propagate errors 
appropriately.
   ```suggestion
       pub unsafe fn record_copy_done(&self) -> Result<()> {
           unsafe {
               let ret = cudaEventRecord(self.event_copy_done, 
self.stream_copy.stream as *mut c_void);
               if ret != 0 {
                   return Err(MahoutError::Cuda(format!("Failed to record CUDA 
event: {}", ret)));
               }
           }
           Ok(())
       }
   
       /// Make compute stream wait for copy completion
       pub unsafe fn wait_for_copy(&self) -> Result<()> {
           crate::profile_scope!("GPU::StreamWait");
           unsafe {
               let ret = cudaStreamWaitEvent(self.stream_compute.stream as *mut 
c_void, self.event_copy_done, 0);
               if ret != 0 {
                   return Err(MahoutError::Cuda(format!("Failed to wait for 
CUDA event: {}", ret)));
               }
           }
           Ok(())
   ```



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups

Review Comment:
   The magic number 2048 for batch size is documented as "Optimized for large 
row groups" but lacks explanation of why this specific value was chosen. 
Consider documenting the rationale (e.g., memory/performance tradeoffs tested) 
or making it configurable if different datasets might benefit from different 
batch sizes.



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -26,10 +26,28 @@ mod profiling;
 pub use error::{MahoutError, Result};
 
 use std::sync::Arc;
+#[cfg(target_os = "linux")]
+use std::ffi::c_void;
+#[cfg(target_os = "linux")]
+use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
+#[cfg(target_os = "linux")]
+use std::thread;
 
-use cudarc::driver::CudaDevice;
+use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
 use crate::dlpack::DLManagedTensor;
 use crate::gpu::get_encoder;
+#[cfg(target_os = "linux")]
+use crate::gpu::memory::{PinnedBuffer, GpuStateVector};
+#[cfg(target_os = "linux")]
+use crate::gpu::PipelineContext;
+#[cfg(target_os = "linux")]
+use qdp_kernels::{launch_l2_norm_batch, launch_amplitude_encode_batch};
+
+/// 512MB staging buffer for large Parquet row groups (reduces fragmentation)

Review Comment:
   The constant STAGE_SIZE_BYTES is set to 512MB but there's no documentation 
explaining why this size was chosen or how it should be adjusted for different 
system configurations. Consider adding a comment explaining the rationale 
(e.g., typical GPU memory, PCIe bandwidth considerations) and any constraints 
or recommendations for different hardware setups.
   ```suggestion
   /// 512MB staging buffer for large Parquet row groups (reduces fragmentation)
   ///
   /// Rationale:
   /// - 512MB is chosen as a balance between minimizing host-GPU transfer 
overhead and avoiding excessive GPU memory usage.
   /// - This size is suitable for most modern GPUs, which typically have 8GB 
or more of memory, and aligns well with PCIe transfer sizes for efficient 
bandwidth utilization.
   /// - Larger buffers can reduce fragmentation and improve throughput for 
large datasets, but may not fit on GPUs with less memory.
   /// - If running on GPUs with less memory (e.g., 4GB or less), or if 
multiple processes share the GPU, consider reducing this value to avoid 
out-of-memory errors.
   /// - Conversely, for high-end GPUs or systems with very large datasets, 
increasing this value may further improve performance, but test for 
fragmentation and memory pressure.
   ```



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;

Review Comment:
   The sample_size is determined after reading the first chunk, but if the 
Parquet file is empty or the first chunk contains no data, sample_size will be 
None. This would cause the ok_or_else to return an error, but only after 
allocating GPU memory and spawning threads. Consider checking if the file is 
empty before allocating resources, or handle the case where the first read 
returns 0 elements.



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            // Send first chunk directly to GPU loop (must be processed first)
+            full_buf_tx.send((host_buf_first, first_len))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            // Send one empty buffer to IO thread for subsequent reads
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            // Spawn IO thread (Producer): continues reading from second chunk 
onwards
+            let mut reader = reader_core;
+            let io_handle = thread::spawn(move || {
+                loop {
+                    let mut buffer = match empty_buf_rx.recv() {
+                        Ok(b) => b,
+                        Err(_) => break,
+                    };
+
+                    let len = match reader.read_chunk(buffer.as_slice_mut()) {
+                        Ok(l) => l,
+                        Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+                    };
+
+                    if full_buf_tx.send((buffer, len)).is_err() { break; }
+                    if len == 0 { break; }
+                }
+            });
+
+            // GPU processing loop: receives pre-read chunk, then IO thread 
chunks
+            let mut global_sample_offset = 0;
+            let mut use_dev_a = true;
+            let state_len_per_sample = 1 << num_qubits;
+
+            loop {
+                let (host_buffer, current_len) = full_buf_rx.recv()
+                    .map_err(|_| MahoutError::Io("IO thread 
disconnected".into()))?;
+
+                // len == 0 means IO thread finished (don't recycle buffer)
+                if current_len == 0 { break; }
+
+                let samples_in_chunk = current_len / sample_size;
+                if samples_in_chunk > 0 {
+                    let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else 
{ *dev_in_b.device_ptr() };
+
+                    unsafe {
+                        crate::profile_scope!("GPU::Dispatch");
+
+                        // Async H2D copy → record event → wait for copy → 
launch kernel
+                        ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut 
c_void, current_len);
+                        ctx.record_copy_done();
+                        ctx.wait_for_copy();
+
+                        // Compute norms and encode batch
+                        {
+                            crate::profile_scope!("GPU::BatchEncode");
+                            let offset_elements = global_sample_offset * 
state_len_per_sample;
+                            let state_ptr_offset = 
total_state_vector.ptr().cast::<u8>()
+                                .add(offset_elements * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+                                .cast::<std::ffi::c_void>();
+
+                            // Allocate norm buffer for this chunk
+                            let mut norm_buffer = 
self.device.alloc_zeros::<f64>(samples_in_chunk)
+                                .map_err(|e| 
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", 
e)))?;
+
+                            // Step 1: Compute L2 norms for this chunk
+                            {
+                                crate::profile_scope!("GPU::NormBatch");
+                                let ret = launch_l2_norm_batch(
+                                    dev_ptr as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    *norm_buffer.device_ptr_mut() as *mut f64,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+                                }
+                            }
+
+                            // Step 2: Encode batch using computed norms
+                            {
+                                crate::profile_scope!("GPU::EncodeBatch");
+                                let ret = launch_amplitude_encode_batch(
+                                    dev_ptr as *const f64,
+                                    state_ptr_offset,
+                                    *norm_buffer.device_ptr() as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    state_len_per_sample,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+                                }
+                            }
+                        }
+
+                        // Sync copy stream before buffer reuse
+                        ctx.sync_copy_stream();
+                    }

Review Comment:
   The unsafe block spans over 50 lines of code (lines 246-301), which makes it 
difficult to audit for safety. Consider minimizing the unsafe scope by wrapping 
only the actual unsafe operations (CUDA FFI calls) in unsafe blocks, while 
keeping safe Rust operations outside. This improves code maintainability and 
makes it easier to verify safety invariants.



##########
qdp/qdp-core/src/gpu/memory.rs:
##########
@@ -263,3 +266,79 @@ impl GpuStateVector {
         }
     }
 }
+
+// === Pinned Memory Implementation ===
+
+/// Pinned Host Memory Buffer (Page-Locked)
+///
+/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+#[cfg(target_os = "linux")]
+pub struct PinnedBuffer {
+    ptr: *mut f64,
+    size_elements: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBuffer {
+    /// Allocate pinned memory
+    pub fn new(elements: usize) -> Result<Self> {
+        unsafe {
+            let bytes = elements * std::mem::size_of::<f64>();
+            let mut ptr: *mut c_void = std::ptr::null_mut();
+
+            unsafe extern "C" {
+                fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags: 
u32) -> i32;
+            }

Review Comment:
   The unsafe extern "C" block is declared inside a function scope (lines 
289-291). This is unconventional and reduces readability. The same function is 
also redeclared in pipeline.rs (line 332). Consider declaring all CUDA FFI 
functions once in a centralized location (e.g., at the module level in 
pipeline.rs) and importing them where needed to avoid duplication and improve 
maintainability.



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        // Drain leftover data from previous read
+        if self.leftover_cursor < self.leftover_data.len() {
+            let available = self.leftover_data.len() - self.leftover_cursor;
+            let space_left = limit - written;
+            let to_copy = std::cmp::min(available, space_left);
+
+            if to_copy > 0 {
+                buffer[written..written+to_copy].copy_from_slice(
+                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                );
+                written += to_copy;
+                self.leftover_cursor += to_copy;
+                if self.leftover_cursor == self.leftover_data.len() {
+                    self.leftover_data.clear();
+                    self.leftover_cursor = 0;
+                }
+            }
+            if written == limit {
+                return Ok(written);
+            }
+        }
+
+        // Read new batches from Parquet
+        while written < limit {
+            match self.reader.next() {
+                Some(Ok(batch)) => {
+                    if batch.num_columns() == 0 {
+                        continue;
+                    }
+                    let column = batch.column(0);
+
+                    let list_array = column
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+                    if list_array.len() == 0 {
+                        continue;
+                    }
+
+                    // Extract sample size from first element
+                    let first_value = list_array.value(0);
+                    let float_array = first_value
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                    let current_sample_size = float_array.len();
+
+                    if self.sample_size.is_none() {
+                        self.sample_size = Some(current_sample_size);
+                        limit = calc_limit(current_sample_size);
+                    }
+
+                    // Extract all values from this batch
+                    let mut batch_values = Vec::new();
+                    for i in 0..list_array.len() {
+                        let value_array = list_array.value(i);
+                        let float_array = value_array
+                            .as_any()
+                            .downcast_ref::<Float64Array>()
+                            .ok_or_else(|| MahoutError::Io("List values must 
be Float64".to_string()))?;
+
+                        if float_array.null_count() == 0 {
+                            
batch_values.extend_from_slice(float_array.values());
+                        } else {
+                            batch_values.extend(float_array.iter().map(|opt| 
opt.unwrap_or(0.0)));
+                        }
+                    }
+
+                    let available = batch_values.len();
+                    let space_left = if written >= limit { 0 } else { limit - 
written };
+
+                    if available <= space_left {
+                        
buffer[written..written+available].copy_from_slice(&batch_values);
+                        written += available;
+                    } else {
+                        if space_left > 0 {
+                            
buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]);
+                            written += space_left;
+                        }
+                        // Save overflow as leftover for next read
+                        self.leftover_data.clear();
+                        
self.leftover_data.extend_from_slice(&batch_values[space_left..]);
+                        self.leftover_cursor = 0;
+                        break;
+                    }
+                },
+                Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet 
read error: {}", e))),
+                None => break,
+            }
+        }
+
+        Ok(written)
+    }

Review Comment:
   The function returns Ok(written) even when a Parquet read error hasn't 
occurred but no more data is available (None case at line 595). However, if 
written is 0 and the file still has data, this could indicate an issue with the 
reader that's being silently ignored. Consider documenting this behavior more 
clearly or adding a check to distinguish between "end of file" and "no data 
written due to buffer constraints".



##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -24,7 +24,104 @@ use std::ffi::c_void;
 use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
 use crate::error::{MahoutError, Result};
 #[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error, 
PinnedBuffer};
+
+// CUDA FFI: centralized here to keep lib.rs clean
+#[cfg(target_os = "linux")]
+unsafe extern "C" {
+    fn cudaMemcpyAsync(dst: *mut c_void, src: *const c_void, count: usize, 
kind: u32, stream: *mut c_void) -> i32;
+    fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32;
+    fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32;
+    fn cudaEventDestroy(event: *mut c_void) -> i32;
+    fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, flags: 
u32) -> i32;
+    fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+}
+
+#[cfg(target_os = "linux")]
+const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+#[cfg(target_os = "linux")]
+const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+    pub stream_compute: CudaStream,
+    pub stream_copy: CudaStream,
+    event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+    /// Create dual streams and sync event
+    pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+        let stream_compute = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+        let stream_copy = device.fork_default_stream()
+            .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+        let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+        unsafe {
+            let ret = cudaEventCreateWithFlags(&mut event_copy_done, 
CUDA_EVENT_DISABLE_TIMING);
+            if ret != 0 {
+                return Err(MahoutError::Cuda(format!("Failed to create CUDA 
event: {}", ret)));
+            }
+        }
+
+        Ok(Self {
+            stream_compute,
+            stream_copy,
+            event_copy_done,
+        })
+    }
+
+    /// Async H2D copy on copy stream
+    pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut 
c_void, len_elements: usize) {
+        crate::profile_scope!("GPU::H2D_Copy");
+        unsafe {
+            cudaMemcpyAsync(
+                dst,
+                src.ptr() as *const c_void,
+                len_elements * std::mem::size_of::<f64>(),
+                CUDA_MEMCPY_HOST_TO_DEVICE,
+                self.stream_copy.stream as *mut c_void
+            );
+        }
+    }
+
+    /// Record copy completion event
+    pub unsafe fn record_copy_done(&self) {
+        unsafe {
+            cudaEventRecord(self.event_copy_done, self.stream_copy.stream as 
*mut c_void);
+        }
+    }
+
+    /// Make compute stream wait for copy completion
+    pub unsafe fn wait_for_copy(&self) {
+        crate::profile_scope!("GPU::StreamWait");
+        unsafe {
+            cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void, 
self.event_copy_done, 0);
+        }
+    }
+
+    /// Sync copy stream (safe to reuse host buffer)
+    pub unsafe fn sync_copy_stream(&self) {
+        crate::profile_scope!("Pipeline::SyncCopy");
+        unsafe {
+            cudaStreamSynchronize(self.stream_copy.stream as *mut c_void);
+        }

Review Comment:
   The cudaStreamSynchronize call at line 110 doesn't check its return value. 
If synchronization fails, the function will return without ensuring the copy 
stream has completed, which could lead to use-after-free bugs when the host 
buffer is reused. Add error checking and propagate the error.
   ```suggestion
       pub unsafe fn sync_copy_stream(&self) -> Result<()> {
           crate::profile_scope!("Pipeline::SyncCopy");
           unsafe {
               let ret = cudaStreamSynchronize(self.stream_copy.stream as *mut 
c_void);
               if ret != 0 {
                   return Err(MahoutError::Cuda(format!("Failed to synchronize 
CUDA copy stream: {}", ret)));
               }
           }
           Ok(())
   ```



##########
qdp/qdp-core/src/gpu/memory.rs:
##########
@@ -263,3 +266,79 @@ impl GpuStateVector {
         }
     }
 }
+
+// === Pinned Memory Implementation ===
+
+/// Pinned Host Memory Buffer (Page-Locked)
+///
+/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+#[cfg(target_os = "linux")]
+pub struct PinnedBuffer {
+    ptr: *mut f64,
+    size_elements: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBuffer {
+    /// Allocate pinned memory
+    pub fn new(elements: usize) -> Result<Self> {
+        unsafe {
+            let bytes = elements * std::mem::size_of::<f64>();
+            let mut ptr: *mut c_void = std::ptr::null_mut();
+
+            unsafe extern "C" {
+                fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags: 
u32) -> i32;
+            }
+
+            let ret = cudaHostAlloc(&mut ptr, bytes, 0); // 
cudaHostAllocDefault
+
+            if ret != 0 {
+                return Err(MahoutError::MemoryAllocation(
+                    format!("cudaHostAlloc failed with error code: {}", ret)
+                ));
+            }
+
+            Ok(Self {
+                ptr: ptr as *mut f64,
+                size_elements: elements,
+            })
+        }
+    }
+
+    /// Get mutable slice to write data into
+    pub fn as_slice_mut(&mut self) -> &mut [f64] {
+        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) }
+    }
+
+    /// Get raw pointer for CUDA memcpy
+    pub fn ptr(&self) -> *const f64 {
+        self.ptr
+    }
+
+    pub fn len(&self) -> usize {
+        self.size_elements
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.size_elements == 0
+    }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBuffer {
+    fn drop(&mut self) {
+        unsafe {
+            unsafe extern "C" {
+                fn cudaFreeHost(ptr: *mut c_void) -> i32;
+            }
+            let _ = cudaFreeHost(self.ptr as *mut c_void);
+        }
+    }
+}
+
+// Safety: Pinned memory is accessible from any thread
+#[cfg(target_os = "linux")]
+unsafe impl Send for PinnedBuffer {}
+
+#[cfg(target_os = "linux")]
+unsafe impl Sync for PinnedBuffer {}

Review Comment:
   The PinnedBuffer implementation lacks test coverage. Given that the codebase 
has memory safety tests (tests/memory_safety.rs exists), this new memory 
allocation mechanism should have tests to verify proper allocation, 
deallocation, thread safety, and interaction with CUDA operations.



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            // Send first chunk directly to GPU loop (must be processed first)
+            full_buf_tx.send((host_buf_first, first_len))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            // Send one empty buffer to IO thread for subsequent reads
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            // Spawn IO thread (Producer): continues reading from second chunk 
onwards
+            let mut reader = reader_core;
+            let io_handle = thread::spawn(move || {
+                loop {
+                    let mut buffer = match empty_buf_rx.recv() {
+                        Ok(b) => b,
+                        Err(_) => break,
+                    };
+
+                    let len = match reader.read_chunk(buffer.as_slice_mut()) {
+                        Ok(l) => l,
+                        Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+                    };
+
+                    if full_buf_tx.send((buffer, len)).is_err() { break; }
+                    if len == 0 { break; }
+                }
+            });
+
+            // GPU processing loop: receives pre-read chunk, then IO thread 
chunks
+            let mut global_sample_offset = 0;
+            let mut use_dev_a = true;
+            let state_len_per_sample = 1 << num_qubits;
+
+            loop {
+                let (host_buffer, current_len) = full_buf_rx.recv()
+                    .map_err(|_| MahoutError::Io("IO thread 
disconnected".into()))?;
+
+                // len == 0 means IO thread finished (don't recycle buffer)
+                if current_len == 0 { break; }
+
+                let samples_in_chunk = current_len / sample_size;
+                if samples_in_chunk > 0 {
+                    let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else 
{ *dev_in_b.device_ptr() };
+
+                    unsafe {
+                        crate::profile_scope!("GPU::Dispatch");
+
+                        // Async H2D copy → record event → wait for copy → 
launch kernel
+                        ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut 
c_void, current_len);
+                        ctx.record_copy_done();
+                        ctx.wait_for_copy();
+
+                        // Compute norms and encode batch
+                        {
+                            crate::profile_scope!("GPU::BatchEncode");
+                            let offset_elements = global_sample_offset * 
state_len_per_sample;
+                            let state_ptr_offset = 
total_state_vector.ptr().cast::<u8>()
+                                .add(offset_elements * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+                                .cast::<std::ffi::c_void>();
+
+                            // Allocate norm buffer for this chunk
+                            let mut norm_buffer = 
self.device.alloc_zeros::<f64>(samples_in_chunk)
+                                .map_err(|e| 
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", 
e)))?;
+
+                            // Step 1: Compute L2 norms for this chunk
+                            {
+                                crate::profile_scope!("GPU::NormBatch");
+                                let ret = launch_l2_norm_batch(
+                                    dev_ptr as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    *norm_buffer.device_ptr_mut() as *mut f64,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+                                }
+                            }
+
+                            // Step 2: Encode batch using computed norms
+                            {
+                                crate::profile_scope!("GPU::EncodeBatch");
+                                let ret = launch_amplitude_encode_batch(
+                                    dev_ptr as *const f64,
+                                    state_ptr_offset,
+                                    *norm_buffer.device_ptr() as *const f64,
+                                    samples_in_chunk,
+                                    sample_size,
+                                    state_len_per_sample,
+                                    ctx.stream_compute.stream as *mut c_void
+                                );
+                                if ret != 0 {
+                                    return 
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+                                }
+                            }
+                        }
+
+                        // Sync copy stream before buffer reuse
+                        ctx.sync_copy_stream();
+                    }
+                    global_sample_offset += samples_in_chunk;
+                    use_dev_a = !use_dev_a;
+                }
+
+                // Return buffer to IO thread (ignore errors if thread exited)
+                let _ = empty_buf_tx.send(host_buffer);
+            }
+
+            self.device.synchronize().map_err(|e| 
MahoutError::Cuda(format!("{:?}", e)))?;
+            let _ = io_handle.join();

Review Comment:
   If the IO thread panics, the join().unwrap() pattern would be better. 
Currently, the error from join is ignored with let _ = io_handle.join(). If the 
thread panicked, this could indicate a serious issue that should be surfaced. 
Consider checking the join result and logging or returning an error if the 
thread panicked.
   ```suggestion
               io_handle.join().map_err(|e| MahoutError::Io(format!("IO thread 
panicked: {:?}", e)))?;
   ```



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -47,9 +65,16 @@ impl QdpEngine {
     pub fn new(device_id: usize) -> Result<Self> {
         let device = CudaDevice::new(device_id)
             .map_err(|e| MahoutError::Cuda(format!("Failed to initialize CUDA 
device {}: {:?}", device_id, e)))?;
-        Ok(Self {
-            device  // CudaDevice::new already returns Arc<CudaDevice> in 
cudarc 0.11
-        })
+
+        #[cfg(target_os = "linux")]
+        {
+            Ok(Self { device })
+        }
+
+        #[cfg(not(target_os = "linux"))]
+        {
+            Ok(Self { device })
+        }

Review Comment:
   The conditional compilation for QdpEngine::new creates identical code for 
both Linux and non-Linux targets (lines 69-77). This redundancy can be 
simplified by removing the cfg attributes entirely, as the code is the same 
regardless of platform.
   ```suggestion
           Ok(Self { device })
   ```



##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
     ) -> Result<*mut DLManagedTensor> {
         crate::profile_scope!("Mahout::EncodeFromParquet");
 
-        // Read Parquet directly using Arrow (faster than pandas)
-        let (batch_data, num_samples, sample_size) = {
-            crate::profile_scope!("IO::ReadParquetBatch");
-            crate::io::read_parquet_batch(path)?
-        };
+        #[cfg(target_os = "linux")]
+        {
+            if encoding_method != "amplitude" {
+                return Err(MahoutError::NotImplemented("Only amplitude 
encoding supported for streaming".into()));
+            }
 
-        // Encode using fused batch kernel
-        self.encode_batch(&batch_data, num_samples, sample_size, num_qubits, 
encoding_method)
+            // Initialize reader
+            let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+            let num_samples = reader_core.total_samples;
+
+            // Allocate GPU memory once
+            let total_state_vector = GpuStateVector::new_batch(&self.device, 
num_samples, num_qubits)?;
+
+            // Initialize dual-stream pipeline context
+            let ctx = PipelineContext::new(&self.device)?;
+
+            // Double-buffered device input (ping-pong)
+            let dev_in_a = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+            let dev_in_b = unsafe { 
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+                .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}", 
e)))?;
+
+            // Setup Producer-Consumer channels
+            let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer, 
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+            let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>, 
Receiver<PinnedBuffer>) = sync_channel(2);
+
+            // CRITICAL FIX: Pre-read first chunk to determine sample_size
+            // This data must be processed, not discarded!
+            let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+            let first_len = 
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+            let sample_size = reader_core.get_sample_size()
+                .ok_or_else(|| MahoutError::InvalidInput("Could not determine 
sample size".into()))?;
+
+            // Send first chunk directly to GPU loop (must be processed first)
+            full_buf_tx.send((host_buf_first, first_len))
+                .map_err(|_| MahoutError::Io("Failed to send first 
buffer".into()))?;
+
+            // Send one empty buffer to IO thread for subsequent reads
+            empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+                .map_err(|_| MahoutError::Io("Failed to send second 
buffer".into()))?;
+
+            // Spawn IO thread (Producer): continues reading from second chunk 
onwards
+            let mut reader = reader_core;

Review Comment:
   The reader is moved into the IO thread without explicit documentation of 
thread safety. While ParquetRecordBatchReader likely implements Send, this 
should be verified and documented, especially since the struct maintains 
internal state (sample_size, leftover_data) that could theoretically be 
problematic if the reader's underlying implementation has thread-affinity 
requirements.
   ```suggestion
               let mut reader = reader_core;
               // SAFETY: We assert at compile time that the reader is Send, as 
it is moved into a thread.
               // If the underlying reader implementation is not thread-safe, 
this will fail to compile.
               let _assert_send: fn() = || {
                   fn assert_send<T: Send>() {}
                   assert_send::<_>();
               };
   ```



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        // Drain leftover data from previous read
+        if self.leftover_cursor < self.leftover_data.len() {
+            let available = self.leftover_data.len() - self.leftover_cursor;
+            let space_left = limit - written;
+            let to_copy = std::cmp::min(available, space_left);
+
+            if to_copy > 0 {
+                buffer[written..written+to_copy].copy_from_slice(
+                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                );
+                written += to_copy;
+                self.leftover_cursor += to_copy;
+                if self.leftover_cursor == self.leftover_data.len() {
+                    self.leftover_data.clear();
+                    self.leftover_cursor = 0;
+                }
+            }
+            if written == limit {
+                return Ok(written);
+            }
+        }
+
+        // Read new batches from Parquet
+        while written < limit {
+            match self.reader.next() {
+                Some(Ok(batch)) => {
+                    if batch.num_columns() == 0 {
+                        continue;
+                    }
+                    let column = batch.column(0);
+
+                    let list_array = column
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+                    if list_array.len() == 0 {
+                        continue;
+                    }
+
+                    // Extract sample size from first element
+                    let first_value = list_array.value(0);
+                    let float_array = first_value
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                    let current_sample_size = float_array.len();
+
+                    if self.sample_size.is_none() {
+                        self.sample_size = Some(current_sample_size);
+                        limit = calc_limit(current_sample_size);
+                    }
+
+                    // Extract all values from this batch
+                    let mut batch_values = Vec::new();
+                    for i in 0..list_array.len() {
+                        let value_array = list_array.value(i);
+                        let float_array = value_array
+                            .as_any()
+                            .downcast_ref::<Float64Array>()
+                            .ok_or_else(|| MahoutError::Io("List values must 
be Float64".to_string()))?;
+
+                        if float_array.null_count() == 0 {
+                            
batch_values.extend_from_slice(float_array.values());
+                        } else {
+                            batch_values.extend(float_array.iter().map(|opt| 
opt.unwrap_or(0.0)));

Review Comment:
   When handling null values in the Float64Array at line 572, they are replaced 
with 0.0. This could silently introduce incorrect data into the quantum 
encoding pipeline. Consider returning an error or at least logging a warning 
when null values are encountered, as this may indicate data quality issues that 
should be addressed at the source.
   ```suggestion
                               return Err(MahoutError::Io("Null value 
encountered in Float64Array during quantum encoding. Please check data quality 
at the source.".to_string()));
   ```



##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) -> 
Result<(Vec<f64>, usize,
 
     Ok((all_data, num_samples, sample_size))
 }
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+    reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+    sample_size: Option<usize>,
+    leftover_data: Vec<f64>,
+    leftover_cursor: usize,
+    pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+    /// Create a new streaming Parquet reader
+    ///
+    /// Uses batch size 2048 for optimal throughput on large row groups.
+    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+        let file = File::open(path.as_ref()).map_err(|e| {
+            MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+        })?;
+
+        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+            MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+        })?;
+
+        let total_rows = builder.metadata().file_metadata().num_rows() as 
usize;
+
+        let reader = builder
+            .with_batch_size(2048) // Optimized for large row groups
+            .build()
+            .map_err(|e| {
+                MahoutError::Io(format!("Failed to build Parquet reader: {}", 
e))
+            })?;
+
+        Ok(Self {
+            reader,
+            sample_size: None,
+            leftover_data: Vec::new(),
+            leftover_cursor: 0,
+            total_samples: total_rows,
+        })
+    }
+
+    /// Get the sample size (number of elements per sample)
+    pub fn get_sample_size(&self) -> Option<usize> {
+        self.sample_size
+    }
+
+    /// Read a chunk of data into the provided buffer
+    ///
+    /// Handles leftover data from previous reads and ensures sample 
boundaries are respected.
+    /// Returns the number of elements written to the buffer.
+    pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+        let mut written = 0;
+        let buf_cap = buffer.len();
+        let calc_limit = |ss: usize| -> usize {
+            if ss == 0 {
+                buf_cap
+            } else {
+                (buf_cap / ss) * ss
+            }
+        };
+        let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+        // Drain leftover data from previous read
+        if self.leftover_cursor < self.leftover_data.len() {
+            let available = self.leftover_data.len() - self.leftover_cursor;
+            let space_left = limit - written;
+            let to_copy = std::cmp::min(available, space_left);
+
+            if to_copy > 0 {
+                buffer[written..written+to_copy].copy_from_slice(
+                    
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+                );
+                written += to_copy;
+                self.leftover_cursor += to_copy;
+                if self.leftover_cursor == self.leftover_data.len() {
+                    self.leftover_data.clear();
+                    self.leftover_cursor = 0;
+                }
+            }
+            if written == limit {
+                return Ok(written);
+            }
+        }
+
+        // Read new batches from Parquet
+        while written < limit {
+            match self.reader.next() {
+                Some(Ok(batch)) => {
+                    if batch.num_columns() == 0 {
+                        continue;
+                    }
+                    let column = batch.column(0);
+
+                    let list_array = column
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| MahoutError::Io("Failed to downcast to 
ListArray".to_string()))?;
+
+                    if list_array.len() == 0 {
+                        continue;
+                    }
+
+                    // Extract sample size from first element
+                    let first_value = list_array.value(0);
+                    let float_array = first_value
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| MahoutError::Io("List values must be 
Float64".to_string()))?;
+
+                    let current_sample_size = float_array.len();
+
+                    if self.sample_size.is_none() {
+                        self.sample_size = Some(current_sample_size);
+                        limit = calc_limit(current_sample_size);
+                    }
+
+                    // Extract all values from this batch
+                    let mut batch_values = Vec::new();
+                    for i in 0..list_array.len() {
+                        let value_array = list_array.value(i);
+                        let float_array = value_array
+                            .as_any()
+                            .downcast_ref::<Float64Array>()
+                            .ok_or_else(|| MahoutError::Io("List values must 
be Float64".to_string()))?;
+
+                        if float_array.null_count() == 0 {
+                            
batch_values.extend_from_slice(float_array.values());
+                        } else {
+                            batch_values.extend(float_array.iter().map(|opt| 
opt.unwrap_or(0.0)));
+                        }
+                    }
+
+                    let available = batch_values.len();
+                    let space_left = if written >= limit { 0 } else { limit - 
written };
+
+                    if available <= space_left {
+                        
buffer[written..written+available].copy_from_slice(&batch_values);
+                        written += available;
+                    } else {
+                        if space_left > 0 {
+                            
buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]);
+                            written += space_left;
+                        }
+                        // Save overflow as leftover for next read
+                        self.leftover_data.clear();
+                        
self.leftover_data.extend_from_slice(&batch_values[space_left..]);
+                        self.leftover_cursor = 0;
+                        break;
+                    }
+                },
+                Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet 
read error: {}", e))),
+                None => break,
+            }
+        }
+
+        Ok(written)
+    }
+}

Review Comment:
   The new streaming Parquet reader (ParquetBlockReader) lacks test coverage. 
Given that other IO functionality in the codebase has comprehensive tests (as 
seen in tests/parquet_io.rs), this new component should also have tests to 
verify correct behavior, especially for edge cases like empty files, partial 
reads, leftover data handling, and sample boundary alignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to