Copilot commented on code in PR #708:
URL: https://github.com/apache/mahout/pull/708#discussion_r2613122645
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ // Initialize reader
+ let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+ let num_samples = reader_core.total_samples;
+
+ // Allocate GPU memory once
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+
+ // Initialize dual-stream pipeline context
+ let ctx = PipelineContext::new(&self.device)?;
+
+ // Double-buffered device input (ping-pong)
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ // Setup Producer-Consumer channels
+ let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer,
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ // CRITICAL FIX: Pre-read first chunk to determine sample_size
+ // This data must be processed, not discarded!
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+
+ // Send first chunk directly to GPU loop (must be processed first)
+ full_buf_tx.send((host_buf_first, first_len))
+ .map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
+
+ // Send one empty buffer to IO thread for subsequent reads
+ empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
+
+ // Spawn IO thread (Producer): continues reading from second chunk
onwards
+ let mut reader = reader_core;
+ let io_handle = thread::spawn(move || {
+ loop {
+ let mut buffer = match empty_buf_rx.recv() {
+ Ok(b) => b,
+ Err(_) => break,
+ };
+
+ let len = match reader.read_chunk(buffer.as_slice_mut()) {
+ Ok(l) => l,
+ Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
Review Comment:
The error from the IO thread is printed to stderr but processing continues.
If the IO thread encounters an error, the GPU loop may hang indefinitely
waiting for data that will never arrive. Consider propagating the error through
the channel or setting a flag that the main thread can check, so the GPU loop
can exit gracefully with an appropriate error message.
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ // Initialize reader
+ let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+ let num_samples = reader_core.total_samples;
+
+ // Allocate GPU memory once
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+
+ // Initialize dual-stream pipeline context
+ let ctx = PipelineContext::new(&self.device)?;
+
+ // Double-buffered device input (ping-pong)
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ // Setup Producer-Consumer channels
+ let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer,
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ // CRITICAL FIX: Pre-read first chunk to determine sample_size
+ // This data must be processed, not discarded!
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+
+ // Send first chunk directly to GPU loop (must be processed first)
+ full_buf_tx.send((host_buf_first, first_len))
+ .map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
+
+ // Send one empty buffer to IO thread for subsequent reads
+ empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
+
+ // Spawn IO thread (Producer): continues reading from second chunk
onwards
+ let mut reader = reader_core;
+ let io_handle = thread::spawn(move || {
+ loop {
+ let mut buffer = match empty_buf_rx.recv() {
+ Ok(b) => b,
+ Err(_) => break,
+ };
+
+ let len = match reader.read_chunk(buffer.as_slice_mut()) {
+ Ok(l) => l,
+ Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+ };
+
+ if full_buf_tx.send((buffer, len)).is_err() { break; }
+ if len == 0 { break; }
+ }
+ });
+
+ // GPU processing loop: receives pre-read chunk, then IO thread
chunks
+ let mut global_sample_offset = 0;
+ let mut use_dev_a = true;
+ let state_len_per_sample = 1 << num_qubits;
+
+ loop {
+ let (host_buffer, current_len) = full_buf_rx.recv()
+ .map_err(|_| MahoutError::Io("IO thread
disconnected".into()))?;
+
+ // len == 0 means IO thread finished (don't recycle buffer)
+ if current_len == 0 { break; }
+
+ let samples_in_chunk = current_len / sample_size;
+ if samples_in_chunk > 0 {
+ let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else
{ *dev_in_b.device_ptr() };
+
+ unsafe {
+ crate::profile_scope!("GPU::Dispatch");
+
+ // Async H2D copy → record event → wait for copy →
launch kernel
+ ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut
c_void, current_len);
+ ctx.record_copy_done();
+ ctx.wait_for_copy();
+
+ // Compute norms and encode batch
+ {
+ crate::profile_scope!("GPU::BatchEncode");
+ let offset_elements = global_sample_offset *
state_len_per_sample;
+ let state_ptr_offset =
total_state_vector.ptr().cast::<u8>()
+ .add(offset_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .cast::<std::ffi::c_void>();
+
+ // Allocate norm buffer for this chunk
+ let mut norm_buffer =
self.device.alloc_zeros::<f64>(samples_in_chunk)
+ .map_err(|e|
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}",
e)))?;
+
+ // Step 1: Compute L2 norms for this chunk
+ {
+ crate::profile_scope!("GPU::NormBatch");
+ let ret = launch_l2_norm_batch(
+ dev_ptr as *const f64,
+ samples_in_chunk,
+ sample_size,
+ *norm_buffer.device_ptr_mut() as *mut f64,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+ }
+ }
+
+ // Step 2: Encode batch using computed norms
+ {
+ crate::profile_scope!("GPU::EncodeBatch");
+ let ret = launch_amplitude_encode_batch(
+ dev_ptr as *const f64,
+ state_ptr_offset,
+ *norm_buffer.device_ptr() as *const f64,
+ samples_in_chunk,
+ sample_size,
+ state_len_per_sample,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+ }
+ }
+ }
+
+ // Sync copy stream before buffer reuse
+ ctx.sync_copy_stream();
+ }
+ global_sample_offset += samples_in_chunk;
+ use_dev_a = !use_dev_a;
+ }
+
+ // Return buffer to IO thread (ignore errors if thread exited)
+ let _ = empty_buf_tx.send(host_buffer);
+ }
+
+ self.device.synchronize().map_err(|e|
MahoutError::Cuda(format!("{:?}", e)))?;
+ let _ = io_handle.join();
+
+ // Transfer ownership to DLPack (Arc handles ref counting)
+ let dlpack_ptr = total_state_vector.to_dlpack();
+ Ok(dlpack_ptr)
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ {
+ let (batch_data, num_samples, sample_size) =
crate::io::read_parquet_batch(path)?;
+ self.encode_batch(&batch_data, num_samples, sample_size,
num_qubits, encoding_method)
+ }
}
Review Comment:
The new streaming pipeline in encode_from_parquet lacks test coverage. This
is a complex producer-consumer implementation with multi-threading, CUDA
streams, and error handling that should be thoroughly tested. Consider adding
tests for: empty files, single-chunk files, multi-chunk files, error scenarios
in IO thread, and correctness of the encoded output compared to the
non-streaming implementation.
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ // Initialize reader
+ let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+ let num_samples = reader_core.total_samples;
+
+ // Allocate GPU memory once
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+
+ // Initialize dual-stream pipeline context
+ let ctx = PipelineContext::new(&self.device)?;
+
+ // Double-buffered device input (ping-pong)
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ // Setup Producer-Consumer channels
+ let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer,
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ // CRITICAL FIX: Pre-read first chunk to determine sample_size
+ // This data must be processed, not discarded!
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+
+ // Send first chunk directly to GPU loop (must be processed first)
+ full_buf_tx.send((host_buf_first, first_len))
+ .map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
+
+ // Send one empty buffer to IO thread for subsequent reads
+ empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
+
+ // Spawn IO thread (Producer): continues reading from second chunk
onwards
+ let mut reader = reader_core;
+ let io_handle = thread::spawn(move || {
+ loop {
+ let mut buffer = match empty_buf_rx.recv() {
+ Ok(b) => b,
+ Err(_) => break,
+ };
+
+ let len = match reader.read_chunk(buffer.as_slice_mut()) {
+ Ok(l) => l,
+ Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+ };
+
+ if full_buf_tx.send((buffer, len)).is_err() { break; }
+ if len == 0 { break; }
+ }
+ });
+
+ // GPU processing loop: receives pre-read chunk, then IO thread
chunks
+ let mut global_sample_offset = 0;
+ let mut use_dev_a = true;
+ let state_len_per_sample = 1 << num_qubits;
+
+ loop {
+ let (host_buffer, current_len) = full_buf_rx.recv()
+ .map_err(|_| MahoutError::Io("IO thread
disconnected".into()))?;
+
+ // len == 0 means IO thread finished (don't recycle buffer)
+ if current_len == 0 { break; }
+
+ let samples_in_chunk = current_len / sample_size;
+ if samples_in_chunk > 0 {
+ let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else
{ *dev_in_b.device_ptr() };
+
+ unsafe {
+ crate::profile_scope!("GPU::Dispatch");
+
+ // Async H2D copy → record event → wait for copy →
launch kernel
+ ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut
c_void, current_len);
+ ctx.record_copy_done();
+ ctx.wait_for_copy();
+
+ // Compute norms and encode batch
+ {
+ crate::profile_scope!("GPU::BatchEncode");
+ let offset_elements = global_sample_offset *
state_len_per_sample;
+ let state_ptr_offset =
total_state_vector.ptr().cast::<u8>()
+ .add(offset_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .cast::<std::ffi::c_void>();
+
+ // Allocate norm buffer for this chunk
+ let mut norm_buffer =
self.device.alloc_zeros::<f64>(samples_in_chunk)
+ .map_err(|e|
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}",
e)))?;
Review Comment:
The norm buffer allocation happens inside the processing loop for each
chunk. This means the buffer is allocated and deallocated repeatedly for every
chunk processed. Consider allocating the norm buffer once outside the loop with
a size of STAGE_SIZE_ELEMENTS / sample_size to avoid repeated allocations which
can fragment memory and reduce performance.
##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -24,7 +24,104 @@ use std::ffi::c_void;
use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
use crate::error::{MahoutError, Result};
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error,
PinnedBuffer};
+
+// CUDA FFI: centralized here to keep lib.rs clean
+#[cfg(target_os = "linux")]
+unsafe extern "C" {
+ fn cudaMemcpyAsync(dst: *mut c_void, src: *const c_void, count: usize,
kind: u32, stream: *mut c_void) -> i32;
+ fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32;
+ fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32;
+ fn cudaEventDestroy(event: *mut c_void) -> i32;
+ fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, flags:
u32) -> i32;
+ fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+}
+
+#[cfg(target_os = "linux")]
+const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+#[cfg(target_os = "linux")]
+const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+ pub stream_compute: CudaStream,
+ pub stream_copy: CudaStream,
+ event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+ /// Create dual streams and sync event
+ pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+ let stream_compute = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+ let stream_copy = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+ let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+ unsafe {
+ let ret = cudaEventCreateWithFlags(&mut event_copy_done,
CUDA_EVENT_DISABLE_TIMING);
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!("Failed to create CUDA
event: {}", ret)));
+ }
+ }
+
+ Ok(Self {
+ stream_compute,
+ stream_copy,
+ event_copy_done,
+ })
+ }
+
+ /// Async H2D copy on copy stream
+ pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut
c_void, len_elements: usize) {
+ crate::profile_scope!("GPU::H2D_Copy");
+ unsafe {
+ cudaMemcpyAsync(
+ dst,
+ src.ptr() as *const c_void,
+ len_elements * std::mem::size_of::<f64>(),
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ self.stream_copy.stream as *mut c_void
+ );
+ }
Review Comment:
The cudaMemcpyAsync return value is not checked. If the memory copy fails,
this will silently continue and potentially cause downstream errors that are
harder to diagnose. Add error checking for the return value and return an
appropriate error to ensure failures are caught early.
```suggestion
pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut
c_void, len_elements: usize) -> Result<()> {
crate::profile_scope!("GPU::H2D_Copy");
unsafe {
let ret = cudaMemcpyAsync(
dst,
src.ptr() as *const c_void,
len_elements * std::mem::size_of::<f64>(),
CUDA_MEMCPY_HOST_TO_DEVICE,
self.stream_copy.stream as *mut c_void
);
if ret != 0 {
return Err(MahoutError::Cuda(format!("cudaMemcpyAsync failed
with error code: {}", ret)));
}
}
Ok(())
```
##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -24,7 +24,104 @@ use std::ffi::c_void;
use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
use crate::error::{MahoutError, Result};
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error,
PinnedBuffer};
+
+// CUDA FFI: centralized here to keep lib.rs clean
+#[cfg(target_os = "linux")]
+unsafe extern "C" {
+ fn cudaMemcpyAsync(dst: *mut c_void, src: *const c_void, count: usize,
kind: u32, stream: *mut c_void) -> i32;
+ fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32;
+ fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32;
+ fn cudaEventDestroy(event: *mut c_void) -> i32;
+ fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, flags:
u32) -> i32;
+ fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+}
+
+#[cfg(target_os = "linux")]
+const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+#[cfg(target_os = "linux")]
+const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+ pub stream_compute: CudaStream,
+ pub stream_copy: CudaStream,
+ event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+ /// Create dual streams and sync event
+ pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+ let stream_compute = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+ let stream_copy = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+ let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+ unsafe {
+ let ret = cudaEventCreateWithFlags(&mut event_copy_done,
CUDA_EVENT_DISABLE_TIMING);
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!("Failed to create CUDA
event: {}", ret)));
+ }
+ }
+
+ Ok(Self {
+ stream_compute,
+ stream_copy,
+ event_copy_done,
+ })
+ }
+
+ /// Async H2D copy on copy stream
+ pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut
c_void, len_elements: usize) {
+ crate::profile_scope!("GPU::H2D_Copy");
+ unsafe {
+ cudaMemcpyAsync(
+ dst,
+ src.ptr() as *const c_void,
+ len_elements * std::mem::size_of::<f64>(),
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ self.stream_copy.stream as *mut c_void
+ );
+ }
+ }
+
+ /// Record copy completion event
+ pub unsafe fn record_copy_done(&self) {
+ unsafe {
+ cudaEventRecord(self.event_copy_done, self.stream_copy.stream as
*mut c_void);
+ }
+ }
+
+ /// Make compute stream wait for copy completion
+ pub unsafe fn wait_for_copy(&self) {
+ crate::profile_scope!("GPU::StreamWait");
+ unsafe {
+ cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void,
self.event_copy_done, 0);
+ }
Review Comment:
The cudaEventRecord and cudaStreamWaitEvent calls at lines 94 and 102 don't
check their return values. If these operations fail, the stream synchronization
may not work correctly, leading to race conditions where the compute stream
starts before data transfer completes. Add error checking and propagate errors
appropriately.
```suggestion
pub unsafe fn record_copy_done(&self) -> Result<()> {
unsafe {
let ret = cudaEventRecord(self.event_copy_done,
self.stream_copy.stream as *mut c_void);
if ret != 0 {
return Err(MahoutError::Cuda(format!("Failed to record CUDA
event: {}", ret)));
}
}
Ok(())
}
/// Make compute stream wait for copy completion
pub unsafe fn wait_for_copy(&self) -> Result<()> {
crate::profile_scope!("GPU::StreamWait");
unsafe {
let ret = cudaStreamWaitEvent(self.stream_compute.stream as *mut
c_void, self.event_copy_done, 0);
if ret != 0 {
return Err(MahoutError::Cuda(format!("Failed to wait for
CUDA event: {}", ret)));
}
}
Ok(())
```
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// Uses batch size 2048 for optimal throughput on large row groups.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let total_rows = builder.metadata().file_metadata().num_rows() as
usize;
+
+ let reader = builder
+ .with_batch_size(2048) // Optimized for large row groups
Review Comment:
The magic number 2048 for batch size is documented as "Optimized for large
row groups" but lacks explanation of why this specific value was chosen.
Consider documenting the rationale (e.g., memory/performance tradeoffs tested)
or making it configurable if different datasets might benefit from different
batch sizes.
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -26,10 +26,28 @@ mod profiling;
pub use error::{MahoutError, Result};
use std::sync::Arc;
+#[cfg(target_os = "linux")]
+use std::ffi::c_void;
+#[cfg(target_os = "linux")]
+use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
+#[cfg(target_os = "linux")]
+use std::thread;
-use cudarc::driver::CudaDevice;
+use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
use crate::dlpack::DLManagedTensor;
use crate::gpu::get_encoder;
+#[cfg(target_os = "linux")]
+use crate::gpu::memory::{PinnedBuffer, GpuStateVector};
+#[cfg(target_os = "linux")]
+use crate::gpu::PipelineContext;
+#[cfg(target_os = "linux")]
+use qdp_kernels::{launch_l2_norm_batch, launch_amplitude_encode_batch};
+
+/// 512MB staging buffer for large Parquet row groups (reduces fragmentation)
Review Comment:
The constant STAGE_SIZE_BYTES is set to 512MB but there's no documentation
explaining why this size was chosen or how it should be adjusted for different
system configurations. Consider adding a comment explaining the rationale
(e.g., typical GPU memory, PCIe bandwidth considerations) and any constraints
or recommendations for different hardware setups.
```suggestion
/// 512MB staging buffer for large Parquet row groups (reduces fragmentation)
///
/// Rationale:
/// - 512MB is chosen as a balance between minimizing host-GPU transfer
overhead and avoiding excessive GPU memory usage.
/// - This size is suitable for most modern GPUs, which typically have 8GB
or more of memory, and aligns well with PCIe transfer sizes for efficient
bandwidth utilization.
/// - Larger buffers can reduce fragmentation and improve throughput for
large datasets, but may not fit on GPUs with less memory.
/// - If running on GPUs with less memory (e.g., 4GB or less), or if
multiple processes share the GPU, consider reducing this value to avoid
out-of-memory errors.
/// - Conversely, for high-end GPUs or systems with very large datasets,
increasing this value may further improve performance, but test for
fragmentation and memory pressure.
```
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ // Initialize reader
+ let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+ let num_samples = reader_core.total_samples;
+
+ // Allocate GPU memory once
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+
+ // Initialize dual-stream pipeline context
+ let ctx = PipelineContext::new(&self.device)?;
+
+ // Double-buffered device input (ping-pong)
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ // Setup Producer-Consumer channels
+ let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer,
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ // CRITICAL FIX: Pre-read first chunk to determine sample_size
+ // This data must be processed, not discarded!
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
Review Comment:
The sample_size is determined after reading the first chunk, but if the
Parquet file is empty or the first chunk contains no data, sample_size will be
None. This would cause the ok_or_else to return an error, but only after
allocating GPU memory and spawning threads. Consider checking if the file is
empty before allocating resources, or handle the case where the first read
returns 0 elements.
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ // Initialize reader
+ let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+ let num_samples = reader_core.total_samples;
+
+ // Allocate GPU memory once
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+
+ // Initialize dual-stream pipeline context
+ let ctx = PipelineContext::new(&self.device)?;
+
+ // Double-buffered device input (ping-pong)
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ // Setup Producer-Consumer channels
+ let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer,
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ // CRITICAL FIX: Pre-read first chunk to determine sample_size
+ // This data must be processed, not discarded!
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+
+ // Send first chunk directly to GPU loop (must be processed first)
+ full_buf_tx.send((host_buf_first, first_len))
+ .map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
+
+ // Send one empty buffer to IO thread for subsequent reads
+ empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
+
+ // Spawn IO thread (Producer): continues reading from second chunk
onwards
+ let mut reader = reader_core;
+ let io_handle = thread::spawn(move || {
+ loop {
+ let mut buffer = match empty_buf_rx.recv() {
+ Ok(b) => b,
+ Err(_) => break,
+ };
+
+ let len = match reader.read_chunk(buffer.as_slice_mut()) {
+ Ok(l) => l,
+ Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+ };
+
+ if full_buf_tx.send((buffer, len)).is_err() { break; }
+ if len == 0 { break; }
+ }
+ });
+
+ // GPU processing loop: receives pre-read chunk, then IO thread
chunks
+ let mut global_sample_offset = 0;
+ let mut use_dev_a = true;
+ let state_len_per_sample = 1 << num_qubits;
+
+ loop {
+ let (host_buffer, current_len) = full_buf_rx.recv()
+ .map_err(|_| MahoutError::Io("IO thread
disconnected".into()))?;
+
+ // len == 0 means IO thread finished (don't recycle buffer)
+ if current_len == 0 { break; }
+
+ let samples_in_chunk = current_len / sample_size;
+ if samples_in_chunk > 0 {
+ let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else
{ *dev_in_b.device_ptr() };
+
+ unsafe {
+ crate::profile_scope!("GPU::Dispatch");
+
+ // Async H2D copy → record event → wait for copy →
launch kernel
+ ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut
c_void, current_len);
+ ctx.record_copy_done();
+ ctx.wait_for_copy();
+
+ // Compute norms and encode batch
+ {
+ crate::profile_scope!("GPU::BatchEncode");
+ let offset_elements = global_sample_offset *
state_len_per_sample;
+ let state_ptr_offset =
total_state_vector.ptr().cast::<u8>()
+ .add(offset_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .cast::<std::ffi::c_void>();
+
+ // Allocate norm buffer for this chunk
+ let mut norm_buffer =
self.device.alloc_zeros::<f64>(samples_in_chunk)
+ .map_err(|e|
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}",
e)))?;
+
+ // Step 1: Compute L2 norms for this chunk
+ {
+ crate::profile_scope!("GPU::NormBatch");
+ let ret = launch_l2_norm_batch(
+ dev_ptr as *const f64,
+ samples_in_chunk,
+ sample_size,
+ *norm_buffer.device_ptr_mut() as *mut f64,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+ }
+ }
+
+ // Step 2: Encode batch using computed norms
+ {
+ crate::profile_scope!("GPU::EncodeBatch");
+ let ret = launch_amplitude_encode_batch(
+ dev_ptr as *const f64,
+ state_ptr_offset,
+ *norm_buffer.device_ptr() as *const f64,
+ samples_in_chunk,
+ sample_size,
+ state_len_per_sample,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+ }
+ }
+ }
+
+ // Sync copy stream before buffer reuse
+ ctx.sync_copy_stream();
+ }
Review Comment:
The unsafe block spans over 50 lines of code (lines 246-301), which makes it
difficult to audit for safety. Consider minimizing the unsafe scope by wrapping
only the actual unsafe operations (CUDA FFI calls) in unsafe blocks, while
keeping safe Rust operations outside. This improves code maintainability and
makes it easier to verify safety invariants.
##########
qdp/qdp-core/src/gpu/memory.rs:
##########
@@ -263,3 +266,79 @@ impl GpuStateVector {
}
}
}
+
+// === Pinned Memory Implementation ===
+
+/// Pinned Host Memory Buffer (Page-Locked)
+///
+/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+#[cfg(target_os = "linux")]
+pub struct PinnedBuffer {
+ ptr: *mut f64,
+ size_elements: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBuffer {
+ /// Allocate pinned memory
+ pub fn new(elements: usize) -> Result<Self> {
+ unsafe {
+ let bytes = elements * std::mem::size_of::<f64>();
+ let mut ptr: *mut c_void = std::ptr::null_mut();
+
+ unsafe extern "C" {
+ fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags:
u32) -> i32;
+ }
Review Comment:
The unsafe extern "C" block is declared inside a function scope (lines
289-291). This is unconventional and reduces readability. The same function is
also redeclared in pipeline.rs (line 332). Consider declaring all CUDA FFI
functions once in a centralized location (e.g., at the module level in
pipeline.rs) and importing them where needed to avoid duplication and improve
maintainability.
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// Uses batch size 2048 for optimal throughput on large row groups.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let total_rows = builder.metadata().file_metadata().num_rows() as
usize;
+
+ let reader = builder
+ .with_batch_size(2048) // Optimized for large row groups
+ .build()
+ .map_err(|e| {
+ MahoutError::Io(format!("Failed to build Parquet reader: {}",
e))
+ })?;
+
+ Ok(Self {
+ reader,
+ sample_size: None,
+ leftover_data: Vec::new(),
+ leftover_cursor: 0,
+ total_samples: total_rows,
+ })
+ }
+
+ /// Get the sample size (number of elements per sample)
+ pub fn get_sample_size(&self) -> Option<usize> {
+ self.sample_size
+ }
+
+ /// Read a chunk of data into the provided buffer
+ ///
+ /// Handles leftover data from previous reads and ensures sample
boundaries are respected.
+ /// Returns the number of elements written to the buffer.
+ pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ let mut written = 0;
+ let buf_cap = buffer.len();
+ let calc_limit = |ss: usize| -> usize {
+ if ss == 0 {
+ buf_cap
+ } else {
+ (buf_cap / ss) * ss
+ }
+ };
+ let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+ // Drain leftover data from previous read
+ if self.leftover_cursor < self.leftover_data.len() {
+ let available = self.leftover_data.len() - self.leftover_cursor;
+ let space_left = limit - written;
+ let to_copy = std::cmp::min(available, space_left);
+
+ if to_copy > 0 {
+ buffer[written..written+to_copy].copy_from_slice(
+
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+ );
+ written += to_copy;
+ self.leftover_cursor += to_copy;
+ if self.leftover_cursor == self.leftover_data.len() {
+ self.leftover_data.clear();
+ self.leftover_cursor = 0;
+ }
+ }
+ if written == limit {
+ return Ok(written);
+ }
+ }
+
+ // Read new batches from Parquet
+ while written < limit {
+ match self.reader.next() {
+ Some(Ok(batch)) => {
+ if batch.num_columns() == 0 {
+ continue;
+ }
+ let column = batch.column(0);
+
+ let list_array = column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
ListArray".to_string()))?;
+
+ if list_array.len() == 0 {
+ continue;
+ }
+
+ // Extract sample size from first element
+ let first_value = list_array.value(0);
+ let float_array = first_value
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must be
Float64".to_string()))?;
+
+ let current_sample_size = float_array.len();
+
+ if self.sample_size.is_none() {
+ self.sample_size = Some(current_sample_size);
+ limit = calc_limit(current_sample_size);
+ }
+
+ // Extract all values from this batch
+ let mut batch_values = Vec::new();
+ for i in 0..list_array.len() {
+ let value_array = list_array.value(i);
+ let float_array = value_array
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must
be Float64".to_string()))?;
+
+ if float_array.null_count() == 0 {
+
batch_values.extend_from_slice(float_array.values());
+ } else {
+ batch_values.extend(float_array.iter().map(|opt|
opt.unwrap_or(0.0)));
+ }
+ }
+
+ let available = batch_values.len();
+ let space_left = if written >= limit { 0 } else { limit -
written };
+
+ if available <= space_left {
+
buffer[written..written+available].copy_from_slice(&batch_values);
+ written += available;
+ } else {
+ if space_left > 0 {
+
buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]);
+ written += space_left;
+ }
+ // Save overflow as leftover for next read
+ self.leftover_data.clear();
+
self.leftover_data.extend_from_slice(&batch_values[space_left..]);
+ self.leftover_cursor = 0;
+ break;
+ }
+ },
+ Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet
read error: {}", e))),
+ None => break,
+ }
+ }
+
+ Ok(written)
+ }
Review Comment:
The function returns Ok(written) even when a Parquet read error hasn't
occurred but no more data is available (None case at line 595). However, if
written is 0 and the file still has data, this could indicate an issue with the
reader that's being silently ignored. Consider documenting this behavior more
clearly or adding a check to distinguish between "end of file" and "no data
written due to buffer constraints".
##########
qdp/qdp-core/src/gpu/pipeline.rs:
##########
@@ -24,7 +24,104 @@ use std::ffi::c_void;
use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
use crate::error::{MahoutError, Result};
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error,
PinnedBuffer};
+
+// CUDA FFI: centralized here to keep lib.rs clean
+#[cfg(target_os = "linux")]
+unsafe extern "C" {
+ fn cudaMemcpyAsync(dst: *mut c_void, src: *const c_void, count: usize,
kind: u32, stream: *mut c_void) -> i32;
+ fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags: u32) -> i32;
+ fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) -> i32;
+ fn cudaEventDestroy(event: *mut c_void) -> i32;
+ fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void, flags:
u32) -> i32;
+ fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+}
+
+#[cfg(target_os = "linux")]
+const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+#[cfg(target_os = "linux")]
+const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+ pub stream_compute: CudaStream,
+ pub stream_copy: CudaStream,
+ event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+ /// Create dual streams and sync event
+ pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+ let stream_compute = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+ let stream_copy = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+ let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+ unsafe {
+ let ret = cudaEventCreateWithFlags(&mut event_copy_done,
CUDA_EVENT_DISABLE_TIMING);
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!("Failed to create CUDA
event: {}", ret)));
+ }
+ }
+
+ Ok(Self {
+ stream_compute,
+ stream_copy,
+ event_copy_done,
+ })
+ }
+
+ /// Async H2D copy on copy stream
+ pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut
c_void, len_elements: usize) {
+ crate::profile_scope!("GPU::H2D_Copy");
+ unsafe {
+ cudaMemcpyAsync(
+ dst,
+ src.ptr() as *const c_void,
+ len_elements * std::mem::size_of::<f64>(),
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ self.stream_copy.stream as *mut c_void
+ );
+ }
+ }
+
+ /// Record copy completion event
+ pub unsafe fn record_copy_done(&self) {
+ unsafe {
+ cudaEventRecord(self.event_copy_done, self.stream_copy.stream as
*mut c_void);
+ }
+ }
+
+ /// Make compute stream wait for copy completion
+ pub unsafe fn wait_for_copy(&self) {
+ crate::profile_scope!("GPU::StreamWait");
+ unsafe {
+ cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void,
self.event_copy_done, 0);
+ }
+ }
+
+ /// Sync copy stream (safe to reuse host buffer)
+ pub unsafe fn sync_copy_stream(&self) {
+ crate::profile_scope!("Pipeline::SyncCopy");
+ unsafe {
+ cudaStreamSynchronize(self.stream_copy.stream as *mut c_void);
+ }
Review Comment:
The cudaStreamSynchronize call at line 110 doesn't check its return value.
If synchronization fails, the function will return without ensuring the copy
stream has completed, which could lead to use-after-free bugs when the host
buffer is reused. Add error checking and propagate the error.
```suggestion
pub unsafe fn sync_copy_stream(&self) -> Result<()> {
crate::profile_scope!("Pipeline::SyncCopy");
unsafe {
let ret = cudaStreamSynchronize(self.stream_copy.stream as *mut
c_void);
if ret != 0 {
return Err(MahoutError::Cuda(format!("Failed to synchronize
CUDA copy stream: {}", ret)));
}
}
Ok(())
```
##########
qdp/qdp-core/src/gpu/memory.rs:
##########
@@ -263,3 +266,79 @@ impl GpuStateVector {
}
}
}
+
+// === Pinned Memory Implementation ===
+
+/// Pinned Host Memory Buffer (Page-Locked)
+///
+/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+#[cfg(target_os = "linux")]
+pub struct PinnedBuffer {
+ ptr: *mut f64,
+ size_elements: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBuffer {
+ /// Allocate pinned memory
+ pub fn new(elements: usize) -> Result<Self> {
+ unsafe {
+ let bytes = elements * std::mem::size_of::<f64>();
+ let mut ptr: *mut c_void = std::ptr::null_mut();
+
+ unsafe extern "C" {
+ fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags:
u32) -> i32;
+ }
+
+ let ret = cudaHostAlloc(&mut ptr, bytes, 0); //
cudaHostAllocDefault
+
+ if ret != 0 {
+ return Err(MahoutError::MemoryAllocation(
+ format!("cudaHostAlloc failed with error code: {}", ret)
+ ));
+ }
+
+ Ok(Self {
+ ptr: ptr as *mut f64,
+ size_elements: elements,
+ })
+ }
+ }
+
+ /// Get mutable slice to write data into
+ pub fn as_slice_mut(&mut self) -> &mut [f64] {
+ unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) }
+ }
+
+ /// Get raw pointer for CUDA memcpy
+ pub fn ptr(&self) -> *const f64 {
+ self.ptr
+ }
+
+ pub fn len(&self) -> usize {
+ self.size_elements
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.size_elements == 0
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBuffer {
+ fn drop(&mut self) {
+ unsafe {
+ unsafe extern "C" {
+ fn cudaFreeHost(ptr: *mut c_void) -> i32;
+ }
+ let _ = cudaFreeHost(self.ptr as *mut c_void);
+ }
+ }
+}
+
+// Safety: Pinned memory is accessible from any thread
+#[cfg(target_os = "linux")]
+unsafe impl Send for PinnedBuffer {}
+
+#[cfg(target_os = "linux")]
+unsafe impl Sync for PinnedBuffer {}
Review Comment:
The PinnedBuffer implementation lacks test coverage. Given that the codebase
has memory safety tests (tests/memory_safety.rs exists), this new memory
allocation mechanism should have tests to verify proper allocation,
deallocation, thread safety, and interaction with CUDA operations.
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ // Initialize reader
+ let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+ let num_samples = reader_core.total_samples;
+
+ // Allocate GPU memory once
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+
+ // Initialize dual-stream pipeline context
+ let ctx = PipelineContext::new(&self.device)?;
+
+ // Double-buffered device input (ping-pong)
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ // Setup Producer-Consumer channels
+ let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer,
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ // CRITICAL FIX: Pre-read first chunk to determine sample_size
+ // This data must be processed, not discarded!
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+
+ // Send first chunk directly to GPU loop (must be processed first)
+ full_buf_tx.send((host_buf_first, first_len))
+ .map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
+
+ // Send one empty buffer to IO thread for subsequent reads
+ empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
+
+ // Spawn IO thread (Producer): continues reading from second chunk
onwards
+ let mut reader = reader_core;
+ let io_handle = thread::spawn(move || {
+ loop {
+ let mut buffer = match empty_buf_rx.recv() {
+ Ok(b) => b,
+ Err(_) => break,
+ };
+
+ let len = match reader.read_chunk(buffer.as_slice_mut()) {
+ Ok(l) => l,
+ Err(e) => { eprintln!("IO Error: {:?}", e); 0 }
+ };
+
+ if full_buf_tx.send((buffer, len)).is_err() { break; }
+ if len == 0 { break; }
+ }
+ });
+
+ // GPU processing loop: receives pre-read chunk, then IO thread
chunks
+ let mut global_sample_offset = 0;
+ let mut use_dev_a = true;
+ let state_len_per_sample = 1 << num_qubits;
+
+ loop {
+ let (host_buffer, current_len) = full_buf_rx.recv()
+ .map_err(|_| MahoutError::Io("IO thread
disconnected".into()))?;
+
+ // len == 0 means IO thread finished (don't recycle buffer)
+ if current_len == 0 { break; }
+
+ let samples_in_chunk = current_len / sample_size;
+ if samples_in_chunk > 0 {
+ let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else
{ *dev_in_b.device_ptr() };
+
+ unsafe {
+ crate::profile_scope!("GPU::Dispatch");
+
+ // Async H2D copy → record event → wait for copy →
launch kernel
+ ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut
c_void, current_len);
+ ctx.record_copy_done();
+ ctx.wait_for_copy();
+
+ // Compute norms and encode batch
+ {
+ crate::profile_scope!("GPU::BatchEncode");
+ let offset_elements = global_sample_offset *
state_len_per_sample;
+ let state_ptr_offset =
total_state_vector.ptr().cast::<u8>()
+ .add(offset_elements *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .cast::<std::ffi::c_void>();
+
+ // Allocate norm buffer for this chunk
+ let mut norm_buffer =
self.device.alloc_zeros::<f64>(samples_in_chunk)
+ .map_err(|e|
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}",
e)))?;
+
+ // Step 1: Compute L2 norms for this chunk
+ {
+ crate::profile_scope!("GPU::NormBatch");
+ let ret = launch_l2_norm_batch(
+ dev_ptr as *const f64,
+ samples_in_chunk,
+ sample_size,
+ *norm_buffer.device_ptr_mut() as *mut f64,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+ }
+ }
+
+ // Step 2: Encode batch using computed norms
+ {
+ crate::profile_scope!("GPU::EncodeBatch");
+ let ret = launch_amplitude_encode_batch(
+ dev_ptr as *const f64,
+ state_ptr_offset,
+ *norm_buffer.device_ptr() as *const f64,
+ samples_in_chunk,
+ sample_size,
+ state_len_per_sample,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+ }
+ }
+ }
+
+ // Sync copy stream before buffer reuse
+ ctx.sync_copy_stream();
+ }
+ global_sample_offset += samples_in_chunk;
+ use_dev_a = !use_dev_a;
+ }
+
+ // Return buffer to IO thread (ignore errors if thread exited)
+ let _ = empty_buf_tx.send(host_buffer);
+ }
+
+ self.device.synchronize().map_err(|e|
MahoutError::Cuda(format!("{:?}", e)))?;
+ let _ = io_handle.join();
Review Comment:
If the IO thread panics, the join().unwrap() pattern would be better.
Currently, the error from join is ignored with let _ = io_handle.join(). If the
thread panicked, this could indicate a serious issue that should be surfaced.
Consider checking the join result and logging or returning an error if the
thread panicked.
```suggestion
io_handle.join().map_err(|e| MahoutError::Io(format!("IO thread
panicked: {:?}", e)))?;
```
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -47,9 +65,16 @@ impl QdpEngine {
pub fn new(device_id: usize) -> Result<Self> {
let device = CudaDevice::new(device_id)
.map_err(|e| MahoutError::Cuda(format!("Failed to initialize CUDA
device {}: {:?}", device_id, e)))?;
- Ok(Self {
- device // CudaDevice::new already returns Arc<CudaDevice> in
cudarc 0.11
- })
+
+ #[cfg(target_os = "linux")]
+ {
+ Ok(Self { device })
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ {
+ Ok(Self { device })
+ }
Review Comment:
The conditional compilation for QdpEngine::new creates identical code for
both Linux and non-Linux targets (lines 69-77). This redundancy can be
simplified by removing the cfg attributes entirely, as the code is the same
regardless of platform.
```suggestion
Ok(Self { device })
```
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -145,14 +166,160 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ // Initialize reader
+ let mut reader_core = crate::io::ParquetBlockReader::new(path)?;
+ let num_samples = reader_core.total_samples;
+
+ // Allocate GPU memory once
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+
+ // Initialize dual-stream pipeline context
+ let ctx = PipelineContext::new(&self.device)?;
+
+ // Double-buffered device input (ping-pong)
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ // Setup Producer-Consumer channels
+ let (full_buf_tx, full_buf_rx): (SyncSender<(PinnedBuffer,
usize)>, Receiver<(PinnedBuffer, usize)>) = sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ // CRITICAL FIX: Pre-read first chunk to determine sample_size
+ // This data must be processed, not discarded!
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+
+ // Send first chunk directly to GPU loop (must be processed first)
+ full_buf_tx.send((host_buf_first, first_len))
+ .map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
+
+ // Send one empty buffer to IO thread for subsequent reads
+ empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
+
+ // Spawn IO thread (Producer): continues reading from second chunk
onwards
+ let mut reader = reader_core;
Review Comment:
The reader is moved into the IO thread without explicit documentation of
thread safety. While ParquetRecordBatchReader likely implements Send, this
should be verified and documented, especially since the struct maintains
internal state (sample_size, leftover_data) that could theoretically be
problematic if the reader's underlying implementation has thread-affinity
requirements.
```suggestion
let mut reader = reader_core;
// SAFETY: We assert at compile time that the reader is Send, as
it is moved into a thread.
// If the underlying reader implementation is not thread-safe,
this will fail to compile.
let _assert_send: fn() = || {
fn assert_send<T: Send>() {}
assert_send::<_>();
};
```
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// Uses batch size 2048 for optimal throughput on large row groups.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let total_rows = builder.metadata().file_metadata().num_rows() as
usize;
+
+ let reader = builder
+ .with_batch_size(2048) // Optimized for large row groups
+ .build()
+ .map_err(|e| {
+ MahoutError::Io(format!("Failed to build Parquet reader: {}",
e))
+ })?;
+
+ Ok(Self {
+ reader,
+ sample_size: None,
+ leftover_data: Vec::new(),
+ leftover_cursor: 0,
+ total_samples: total_rows,
+ })
+ }
+
+ /// Get the sample size (number of elements per sample)
+ pub fn get_sample_size(&self) -> Option<usize> {
+ self.sample_size
+ }
+
+ /// Read a chunk of data into the provided buffer
+ ///
+ /// Handles leftover data from previous reads and ensures sample
boundaries are respected.
+ /// Returns the number of elements written to the buffer.
+ pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ let mut written = 0;
+ let buf_cap = buffer.len();
+ let calc_limit = |ss: usize| -> usize {
+ if ss == 0 {
+ buf_cap
+ } else {
+ (buf_cap / ss) * ss
+ }
+ };
+ let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+ // Drain leftover data from previous read
+ if self.leftover_cursor < self.leftover_data.len() {
+ let available = self.leftover_data.len() - self.leftover_cursor;
+ let space_left = limit - written;
+ let to_copy = std::cmp::min(available, space_left);
+
+ if to_copy > 0 {
+ buffer[written..written+to_copy].copy_from_slice(
+
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+ );
+ written += to_copy;
+ self.leftover_cursor += to_copy;
+ if self.leftover_cursor == self.leftover_data.len() {
+ self.leftover_data.clear();
+ self.leftover_cursor = 0;
+ }
+ }
+ if written == limit {
+ return Ok(written);
+ }
+ }
+
+ // Read new batches from Parquet
+ while written < limit {
+ match self.reader.next() {
+ Some(Ok(batch)) => {
+ if batch.num_columns() == 0 {
+ continue;
+ }
+ let column = batch.column(0);
+
+ let list_array = column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
ListArray".to_string()))?;
+
+ if list_array.len() == 0 {
+ continue;
+ }
+
+ // Extract sample size from first element
+ let first_value = list_array.value(0);
+ let float_array = first_value
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must be
Float64".to_string()))?;
+
+ let current_sample_size = float_array.len();
+
+ if self.sample_size.is_none() {
+ self.sample_size = Some(current_sample_size);
+ limit = calc_limit(current_sample_size);
+ }
+
+ // Extract all values from this batch
+ let mut batch_values = Vec::new();
+ for i in 0..list_array.len() {
+ let value_array = list_array.value(i);
+ let float_array = value_array
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must
be Float64".to_string()))?;
+
+ if float_array.null_count() == 0 {
+
batch_values.extend_from_slice(float_array.values());
+ } else {
+ batch_values.extend(float_array.iter().map(|opt|
opt.unwrap_or(0.0)));
Review Comment:
When handling null values in the Float64Array at line 572, they are replaced
with 0.0. This could silently introduce incorrect data into the quantum
encoding pipeline. Consider returning an error or at least logging a warning
when null values are encountered, as this may indicate data quality issues that
should be addressed at the source.
```suggestion
return Err(MahoutError::Io("Null value
encountered in Float64Array during quantum encoding. Please check data quality
at the source.".to_string()));
```
##########
qdp/qdp-core/src/io.rs:
##########
@@ -438,3 +438,164 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_samples: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// Uses batch size 2048 for optimal throughput on large row groups.
+ pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let total_rows = builder.metadata().file_metadata().num_rows() as
usize;
+
+ let reader = builder
+ .with_batch_size(2048) // Optimized for large row groups
+ .build()
+ .map_err(|e| {
+ MahoutError::Io(format!("Failed to build Parquet reader: {}",
e))
+ })?;
+
+ Ok(Self {
+ reader,
+ sample_size: None,
+ leftover_data: Vec::new(),
+ leftover_cursor: 0,
+ total_samples: total_rows,
+ })
+ }
+
+ /// Get the sample size (number of elements per sample)
+ pub fn get_sample_size(&self) -> Option<usize> {
+ self.sample_size
+ }
+
+ /// Read a chunk of data into the provided buffer
+ ///
+ /// Handles leftover data from previous reads and ensures sample
boundaries are respected.
+ /// Returns the number of elements written to the buffer.
+ pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ let mut written = 0;
+ let buf_cap = buffer.len();
+ let calc_limit = |ss: usize| -> usize {
+ if ss == 0 {
+ buf_cap
+ } else {
+ (buf_cap / ss) * ss
+ }
+ };
+ let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+ // Drain leftover data from previous read
+ if self.leftover_cursor < self.leftover_data.len() {
+ let available = self.leftover_data.len() - self.leftover_cursor;
+ let space_left = limit - written;
+ let to_copy = std::cmp::min(available, space_left);
+
+ if to_copy > 0 {
+ buffer[written..written+to_copy].copy_from_slice(
+
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+ );
+ written += to_copy;
+ self.leftover_cursor += to_copy;
+ if self.leftover_cursor == self.leftover_data.len() {
+ self.leftover_data.clear();
+ self.leftover_cursor = 0;
+ }
+ }
+ if written == limit {
+ return Ok(written);
+ }
+ }
+
+ // Read new batches from Parquet
+ while written < limit {
+ match self.reader.next() {
+ Some(Ok(batch)) => {
+ if batch.num_columns() == 0 {
+ continue;
+ }
+ let column = batch.column(0);
+
+ let list_array = column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
ListArray".to_string()))?;
+
+ if list_array.len() == 0 {
+ continue;
+ }
+
+ // Extract sample size from first element
+ let first_value = list_array.value(0);
+ let float_array = first_value
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must be
Float64".to_string()))?;
+
+ let current_sample_size = float_array.len();
+
+ if self.sample_size.is_none() {
+ self.sample_size = Some(current_sample_size);
+ limit = calc_limit(current_sample_size);
+ }
+
+ // Extract all values from this batch
+ let mut batch_values = Vec::new();
+ for i in 0..list_array.len() {
+ let value_array = list_array.value(i);
+ let float_array = value_array
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must
be Float64".to_string()))?;
+
+ if float_array.null_count() == 0 {
+
batch_values.extend_from_slice(float_array.values());
+ } else {
+ batch_values.extend(float_array.iter().map(|opt|
opt.unwrap_or(0.0)));
+ }
+ }
+
+ let available = batch_values.len();
+ let space_left = if written >= limit { 0 } else { limit -
written };
+
+ if available <= space_left {
+
buffer[written..written+available].copy_from_slice(&batch_values);
+ written += available;
+ } else {
+ if space_left > 0 {
+
buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]);
+ written += space_left;
+ }
+ // Save overflow as leftover for next read
+ self.leftover_data.clear();
+
self.leftover_data.extend_from_slice(&batch_values[space_left..]);
+ self.leftover_cursor = 0;
+ break;
+ }
+ },
+ Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet
read error: {}", e))),
+ None => break,
+ }
+ }
+
+ Ok(written)
+ }
+}
Review Comment:
The new streaming Parquet reader (ParquetBlockReader) lacks test coverage.
Given that other IO functionality in the codebase has comprehensive tests (as
seen in tests/parquet_io.rs), this new component should also have tests to
verify correct behavior, especially for edge cases like empty files, partial
reads, leftover data handling, and sample boundary alignment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]