This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new ee9b0e91e [QDP] improve memory management (#708)
ee9b0e91e is described below
commit ee9b0e91e203176e30c2f0323b7ecc17097b4b92
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Thu Dec 18 16:35:53 2025 +0800
[QDP] improve memory management (#708)
* DataLoader-Benchmark
Signed-off-by: 400Ping <[email protected]>
* update output
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* [Fix] Remove print in Qiskit
Signed-off-by: 400Ping <[email protected]>
* fix
Signed-off-by: 400Ping <[email protected]>
* fix pre-commit
Signed-off-by: 400Ping <[email protected]>
* [QDP] improve memory management
* fix pre-commit
Signed-off-by: 400Ping <[email protected]>
* update doc
Signed-off-by: 400Ping <[email protected]>
* update doc
Signed-off-by: 400Ping <[email protected]>
* improve memory usage
* improve pool
* improve for memory
* fix back
* merge and improve
* fix error
* memory release
* follow suggestions
* Address reviewer feedback: safety checks, error handling, and code cleanup
* Revert e2e Benchmark
* refactor ffi
* remove redundant check
* fix error
---------
Signed-off-by: 400Ping <[email protected]>
Co-authored-by: 400Ping <[email protected]>
---
qdp/qdp-core/src/gpu/cuda_ffi.rs | 52 ++++++
qdp/qdp-core/src/gpu/encodings/amplitude.rs | 15 +-
qdp/qdp-core/src/gpu/memory.rs | 87 +++++++++-
qdp/qdp-core/src/gpu/mod.rs | 6 +
qdp/qdp-core/src/gpu/pipeline.rs | 102 ++++++++++--
qdp/qdp-core/src/io.rs | 241 ++++++++++++++++++++++++++++
qdp/qdp-core/src/lib.rs | 205 +++++++++++++++++++++--
qdp/qdp-core/tests/arrow_ipc_io.rs | 4 +-
qdp/qdp-core/tests/common/mod.rs | 3 +-
9 files changed, 666 insertions(+), 49 deletions(-)
diff --git a/qdp/qdp-core/src/gpu/cuda_ffi.rs b/qdp/qdp-core/src/gpu/cuda_ffi.rs
new file mode 100644
index 000000000..b61b4e4b2
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/cuda_ffi.rs
@@ -0,0 +1,52 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Centralized CUDA Runtime API FFI declarations.
+
+#![cfg(target_os = "linux")]
+
+use std::ffi::c_void;
+
+pub(crate) const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+pub(crate) const CUDA_EVENT_DISABLE_TIMING: u32 = 0x02;
+
+unsafe extern "C" {
+ pub(crate) fn cudaHostAlloc(pHost: *mut *mut c_void, size: usize, flags:
u32) -> i32;
+ pub(crate) fn cudaFreeHost(ptr: *mut c_void) -> i32;
+
+ pub(crate) fn cudaMemGetInfo(free: *mut usize, total: *mut usize) -> i32;
+
+ pub(crate) fn cudaMemcpyAsync(
+ dst: *mut c_void,
+ src: *const c_void,
+ count: usize,
+ kind: u32,
+ stream: *mut c_void,
+ ) -> i32;
+
+ pub(crate) fn cudaEventCreateWithFlags(event: *mut *mut c_void, flags:
u32) -> i32;
+ pub(crate) fn cudaEventRecord(event: *mut c_void, stream: *mut c_void) ->
i32;
+ pub(crate) fn cudaEventDestroy(event: *mut c_void) -> i32;
+ pub(crate) fn cudaStreamWaitEvent(stream: *mut c_void, event: *mut c_void,
flags: u32) -> i32;
+ pub(crate) fn cudaStreamSynchronize(stream: *mut c_void) -> i32;
+
+ pub(crate) fn cudaMemsetAsync(
+ devPtr: *mut c_void,
+ value: i32,
+ count: usize,
+ stream: *mut c_void,
+ ) -> i32;
+}
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 715f0984e..70b38895e 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Amplitude encoding: direct state injection with L2 normalization
+// Amplitude encoding: state injection with L2 normalization
use std::sync::Arc;
@@ -27,6 +27,8 @@ use super::QuantumEncoder;
#[cfg(target_os = "linux")]
use std::ffi::c_void;
#[cfg(target_os = "linux")]
+use crate::gpu::cuda_ffi::cudaMemsetAsync;
+#[cfg(target_os = "linux")]
use cudarc::driver::{DevicePtr, DevicePtrMut};
#[cfg(target_os = "linux")]
use qdp_kernels::{
@@ -279,7 +281,7 @@ impl QuantumEncoder for AmplitudeEncoder {
impl AmplitudeEncoder {
- /// Async pipeline encoding for large data (SSS-tier optimization)
+ /// Async pipeline encoding for large data
///
/// Uses the generic dual-stream pipeline infrastructure to overlap
/// data transfer and computation. The pipeline handles all the
@@ -359,15 +361,6 @@ impl AmplitudeEncoder {
// Zero-fill padding region using CUDA Runtime API
// Use default stream since pipeline streams are already
synchronized
unsafe {
- unsafe extern "C" {
- fn cudaMemsetAsync(
- devPtr: *mut c_void,
- value: i32,
- count: usize,
- stream: *mut c_void,
- ) -> i32;
- }
-
let result = cudaMemsetAsync(
tail_ptr,
0,
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 72bf5bc7f..26e7b1383 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -26,6 +26,9 @@ pub enum Precision {
Float64,
}
+#[cfg(target_os = "linux")]
+use crate::gpu::cuda_ffi::{cudaFreeHost, cudaHostAlloc, cudaMemGetInfo};
+
#[cfg(target_os = "linux")]
fn bytes_to_mib(bytes: usize) -> f64 {
bytes as f64 / (1024.0 * 1024.0)
@@ -45,10 +48,6 @@ fn cuda_error_to_string(code: i32) -> &'static str {
#[cfg(target_os = "linux")]
fn query_cuda_mem_info() -> Result<(usize, usize)> {
unsafe {
- unsafe extern "C" {
- fn cudaMemGetInfo(free: *mut usize, total: *mut usize) -> i32;
- }
-
let mut free_bytes: usize = 0;
let mut total_bytes: usize = 0;
let result = cudaMemGetInfo(&mut free_bytes as *mut usize, &mut
total_bytes as *mut usize);
@@ -379,3 +378,83 @@ impl GpuStateVector {
}
}
}
+
+// === Pinned Memory Implementation ===
+
+/// Pinned Host Memory Buffer (Page-Locked)
+///
+/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+#[cfg(target_os = "linux")]
+pub struct PinnedBuffer {
+ ptr: *mut f64,
+ size_elements: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBuffer {
+ /// Allocate pinned memory
+ pub fn new(elements: usize) -> Result<Self> {
+ unsafe {
+ let bytes = elements
+ .checked_mul(std::mem::size_of::<f64>())
+ .ok_or_else(|| MahoutError::MemoryAllocation(
+ format!("Requested pinned buffer allocation size overflow
(elements={})", elements)
+ ))?;
+ let mut ptr: *mut c_void = std::ptr::null_mut();
+
+ let ret = cudaHostAlloc(&mut ptr, bytes, 0); //
cudaHostAllocDefault
+
+ if ret != 0 {
+ return Err(MahoutError::MemoryAllocation(
+ format!("cudaHostAlloc failed with error code: {}", ret)
+ ));
+ }
+
+ Ok(Self {
+ ptr: ptr as *mut f64,
+ size_elements: elements,
+ })
+ }
+ }
+
+ /// Get mutable slice to write data into
+ pub fn as_slice_mut(&mut self) -> &mut [f64] {
+ unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) }
+ }
+
+ /// Get raw pointer for CUDA memcpy
+ pub fn ptr(&self) -> *const f64 {
+ self.ptr
+ }
+
+ pub fn len(&self) -> usize {
+ self.size_elements
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.size_elements == 0
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBuffer {
+ fn drop(&mut self) {
+ unsafe {
+ let result = cudaFreeHost(self.ptr as *mut c_void);
+ if result != 0 {
+ eprintln!(
+ "Warning: cudaFreeHost failed with error code {} ({})",
+ result,
+ cuda_error_to_string(result)
+ );
+ }
+ }
+ }
+}
+
+// Safety: Pinned memory is accessible from any thread
+#[cfg(target_os = "linux")]
+unsafe impl Send for PinnedBuffer {}
+
+#[cfg(target_os = "linux")]
+unsafe impl Sync for PinnedBuffer {}
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index fe7cdace0..77b41f6e8 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -18,6 +18,12 @@ pub mod memory;
pub mod encodings;
pub mod pipeline;
+#[cfg(target_os = "linux")]
+pub(crate) mod cuda_ffi;
+
pub use memory::GpuStateVector;
pub use encodings::{QuantumEncoder, AmplitudeEncoder, AngleEncoder,
BasisEncoder, get_encoder};
pub use pipeline::run_dual_stream_pipeline;
+
+#[cfg(target_os = "linux")]
+pub use pipeline::PipelineContext;
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index 3c5921c38..bbe74c7b0 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -24,7 +24,94 @@ use std::ffi::c_void;
use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
use crate::error::{MahoutError, Result};
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error,
PinnedBuffer};
+
+#[cfg(target_os = "linux")]
+use crate::gpu::cuda_ffi::{
+ cudaEventCreateWithFlags, cudaEventDestroy, cudaEventRecord,
cudaMemcpyAsync, cudaStreamSynchronize,
+ cudaStreamWaitEvent, CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE,
+};
+
+/// Dual-stream pipeline context: manages compute/copy streams and sync events
+#[cfg(target_os = "linux")]
+pub struct PipelineContext {
+ pub stream_compute: CudaStream,
+ pub stream_copy: CudaStream,
+ event_copy_done: *mut c_void,
+}
+
+#[cfg(target_os = "linux")]
+impl PipelineContext {
+ /// Create dual streams and sync event
+ pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+ let stream_compute = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+ let stream_copy = device.fork_default_stream()
+ .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+
+ let mut event_copy_done: *mut c_void = std::ptr::null_mut();
+ unsafe {
+ let ret = cudaEventCreateWithFlags(&mut event_copy_done,
CUDA_EVENT_DISABLE_TIMING);
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!("Failed to create CUDA
event: {}", ret)));
+ }
+ }
+
+ Ok(Self {
+ stream_compute,
+ stream_copy,
+ event_copy_done,
+ })
+ }
+
+ /// Async H2D copy on copy stream
+ pub unsafe fn async_copy_to_device(&self, src: &PinnedBuffer, dst: *mut
c_void, len_elements: usize) {
+ crate::profile_scope!("GPU::H2D_Copy");
+ unsafe {
+ cudaMemcpyAsync(
+ dst,
+ src.ptr() as *const c_void,
+ len_elements * std::mem::size_of::<f64>(),
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ self.stream_copy.stream as *mut c_void
+ );
+ }
+ }
+
+ /// Record copy completion event
+ pub unsafe fn record_copy_done(&self) {
+ unsafe {
+ cudaEventRecord(self.event_copy_done, self.stream_copy.stream as
*mut c_void);
+ }
+ }
+
+ /// Make compute stream wait for copy completion
+ pub unsafe fn wait_for_copy(&self) {
+ crate::profile_scope!("GPU::StreamWait");
+ unsafe {
+ cudaStreamWaitEvent(self.stream_compute.stream as *mut c_void,
self.event_copy_done, 0);
+ }
+ }
+
+ /// Sync copy stream (safe to reuse host buffer)
+ pub unsafe fn sync_copy_stream(&self) {
+ crate::profile_scope!("Pipeline::SyncCopy");
+ unsafe {
+ cudaStreamSynchronize(self.stream_copy.stream as *mut c_void);
+ }
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PipelineContext {
+ fn drop(&mut self) {
+ unsafe {
+ if !self.event_copy_done.is_null() {
+ cudaEventDestroy(self.event_copy_done);
+ }
+ }
+ }
+}
/// Chunk processing callback for async pipeline
///
@@ -112,24 +199,11 @@ where
{
crate::profile_scope!("GPU::H2DCopyAsync");
unsafe {
- unsafe extern "C" {
- fn cudaMemcpyAsync(
- dst: *mut c_void,
- src: *const c_void,
- count: usize,
- kind: u32,
- stream: *mut c_void,
- ) -> i32;
- }
-
let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut
c_void;
let src_host_ptr = chunk.as_ptr() as *const c_void;
let bytes = chunk.len() * std::mem::size_of::<f64>();
let stream_handle = current_stream.stream as *mut c_void;
- // cudaMemcpyKind: cudaMemcpyHostToDevice = 1
- const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
-
let result = cudaMemcpyAsync(
dst_device_ptr,
src_host_ptr,
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index 372f4ef75..e7b253626 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -438,3 +438,244 @@ pub fn read_arrow_ipc_batch<P: AsRef<Path>>(path: P) ->
Result<(Vec<f64>, usize,
Ok((all_data, num_samples, sample_size))
}
+
+/// Streaming Parquet reader for List<Float64> and FixedSizeList<Float64>
columns
+///
+/// Reads Parquet files in chunks without loading entire file into memory.
+/// Supports efficient streaming for large files via Producer-Consumer pattern.
+pub struct ParquetBlockReader {
+ reader: parquet::arrow::arrow_reader::ParquetRecordBatchReader,
+ sample_size: Option<usize>,
+ leftover_data: Vec<f64>,
+ leftover_cursor: usize,
+ pub total_rows: usize,
+}
+
+impl ParquetBlockReader {
+ /// Create a new streaming Parquet reader
+ ///
+ /// # Arguments
+ /// * `path` - Path to the Parquet file
+ /// * `batch_size` - Optional batch size (defaults to 2048)
+ pub fn new<P: AsRef<Path>>(path: P, batch_size: Option<usize>) ->
Result<Self> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let schema = builder.schema();
+ if schema.fields().len() != 1 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected exactly one column, got {}",
+ schema.fields().len()
+ )));
+ }
+
+ let field = &schema.fields()[0];
+ match field.data_type() {
+ DataType::List(child_field) => {
+ if !matches!(child_field.data_type(), DataType::Float64) {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected List<Float64> column, got List<{:?}>",
+ child_field.data_type()
+ )));
+ }
+ }
+ DataType::FixedSizeList(child_field, _) => {
+ if !matches!(child_field.data_type(), DataType::Float64) {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected FixedSizeList<Float64> column, got
FixedSizeList<{:?}>",
+ child_field.data_type()
+ )));
+ }
+ }
+ _ => {
+ return Err(MahoutError::InvalidInput(format!(
+ "Expected List<Float64> or FixedSizeList<Float64> column,
got {:?}",
+ field.data_type()
+ )));
+ }
+ }
+
+ let total_rows = builder.metadata().file_metadata().num_rows() as
usize;
+
+ let batch_size = batch_size.unwrap_or(2048);
+ let reader = builder
+ .with_batch_size(batch_size)
+ .build()
+ .map_err(|e| {
+ MahoutError::Io(format!("Failed to build Parquet reader: {}",
e))
+ })?;
+
+ Ok(Self {
+ reader,
+ sample_size: None,
+ leftover_data: Vec::new(),
+ leftover_cursor: 0,
+ total_rows,
+ })
+ }
+
+ /// Get the sample size (number of elements per sample)
+ pub fn get_sample_size(&self) -> Option<usize> {
+ self.sample_size
+ }
+
+ /// Read a chunk of data into the provided buffer
+ ///
+ /// Handles leftover data from previous reads and ensures sample
boundaries are respected.
+ /// Returns the number of elements written to the buffer.
+ pub fn read_chunk(&mut self, buffer: &mut [f64]) -> Result<usize> {
+ let mut written = 0;
+ let buf_cap = buffer.len();
+ let calc_limit = |ss: usize| -> usize {
+ if ss == 0 {
+ buf_cap
+ } else {
+ (buf_cap / ss) * ss
+ }
+ };
+ let mut limit = self.sample_size.map_or(buf_cap, calc_limit);
+
+ if self.sample_size.is_some() {
+ while self.leftover_cursor < self.leftover_data.len() && written <
limit {
+ let available = self.leftover_data.len() -
self.leftover_cursor;
+ let space_left = limit - written;
+ let to_copy = std::cmp::min(available, space_left);
+
+ if to_copy > 0 {
+ buffer[written..written+to_copy].copy_from_slice(
+
&self.leftover_data[self.leftover_cursor..self.leftover_cursor+to_copy]
+ );
+ written += to_copy;
+ self.leftover_cursor += to_copy;
+
+ if self.leftover_cursor == self.leftover_data.len() {
+ self.leftover_data.clear();
+ self.leftover_cursor = 0;
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ while written < limit {
+ match self.reader.next() {
+ Some(Ok(batch)) => {
+ if batch.num_columns() == 0 {
+ continue;
+ }
+ let column = batch.column(0);
+
+ let (current_sample_size, batch_values) = match
column.data_type() {
+ DataType::List(_) => {
+ let list_array = column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to
downcast to ListArray".to_string()))?;
+
+ if list_array.len() == 0 {
+ continue;
+ }
+
+ let mut batch_values = Vec::new();
+ let mut current_sample_size = None;
+ for i in 0..list_array.len() {
+ let value_array = list_array.value(i);
+ let float_array = value_array
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List
values must be Float64".to_string()))?;
+
+ if i == 0 {
+ current_sample_size =
Some(float_array.len());
+ }
+
+ if float_array.null_count() == 0 {
+
batch_values.extend_from_slice(float_array.values());
+ } else {
+ return Err(MahoutError::Io("Null value
encountered in Float64Array during quantum encoding. Please check data quality
at the source.".to_string()));
+ }
+ }
+
+ (current_sample_size.expect("list_array.len() > 0
ensures at least one element"), batch_values)
+ }
+ DataType::FixedSizeList(_, size) => {
+ let list_array = column
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to
downcast to FixedSizeListArray".to_string()))?;
+
+ if list_array.len() == 0 {
+ continue;
+ }
+
+ let current_sample_size = *size as usize;
+
+ let values = list_array.values();
+ let float_array = values
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("FixedSizeList
values must be Float64".to_string()))?;
+
+ let mut batch_values = Vec::new();
+ if float_array.null_count() == 0 {
+
batch_values.extend_from_slice(float_array.values());
+ } else {
+ return Err(MahoutError::Io("Null value
encountered in Float64Array during quantum encoding. Please check data quality
at the source.".to_string()));
+ }
+
+ (current_sample_size, batch_values)
+ }
+ _ => {
+ return Err(MahoutError::Io(format!(
+ "Expected List<Float64> or
FixedSizeList<Float64>, got {:?}",
+ column.data_type()
+ )));
+ }
+ };
+
+ if self.sample_size.is_none() {
+ self.sample_size = Some(current_sample_size);
+ limit = calc_limit(current_sample_size);
+ } else {
+ if let Some(expected_size) = self.sample_size {
+ if current_sample_size != expected_size {
+ return Err(MahoutError::InvalidInput(format!(
+ "Inconsistent sample sizes: expected {},
got {}",
+ expected_size, current_sample_size
+ )));
+ }
+ }
+ }
+
+ let available = batch_values.len();
+ let space_left = limit - written;
+
+ if available <= space_left {
+
buffer[written..written+available].copy_from_slice(&batch_values);
+ written += available;
+ } else {
+ if space_left > 0 {
+
buffer[written..written+space_left].copy_from_slice(&batch_values[0..space_left]);
+ written += space_left;
+ }
+ self.leftover_data.clear();
+
self.leftover_data.extend_from_slice(&batch_values[space_left..]);
+ self.leftover_cursor = 0;
+ break;
+ }
+ },
+ Some(Err(e)) => return Err(MahoutError::Io(format!("Parquet
read error: {}", e))),
+ None => break,
+ }
+ }
+
+ Ok(written)
+ }
+}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index e14e2bab0..429813c26 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -26,10 +26,28 @@ pub use error::{MahoutError, Result};
pub use gpu::memory::Precision;
use std::sync::Arc;
+#[cfg(target_os = "linux")]
+use std::ffi::c_void;
+#[cfg(target_os = "linux")]
+use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
+#[cfg(target_os = "linux")]
+use std::thread;
-use cudarc::driver::CudaDevice;
+use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
use crate::dlpack::DLManagedTensor;
use crate::gpu::get_encoder;
+#[cfg(target_os = "linux")]
+use crate::gpu::memory::{PinnedBuffer, GpuStateVector};
+#[cfg(target_os = "linux")]
+use crate::gpu::PipelineContext;
+#[cfg(target_os = "linux")]
+use qdp_kernels::{launch_l2_norm_batch, launch_amplitude_encode_batch};
+
+/// 512MB staging buffer for large Parquet row groups (reduces fragmentation)
+#[cfg(target_os = "linux")]
+const STAGE_SIZE_BYTES: usize = 512 * 1024 * 1024;
+#[cfg(target_os = "linux")]
+const STAGE_SIZE_ELEMENTS: usize = STAGE_SIZE_BYTES /
std::mem::size_of::<f64>();
/// Main entry point for Mahout QDP
///
@@ -134,18 +152,18 @@ impl QdpEngine {
Ok(dlpack_ptr)
}
- /// Load data from Parquet file and encode into quantum state
+ /// Streaming Parquet encoder with multi-threaded IO
///
- /// Reads Parquet file with List<Float64> column format and encodes all
samples
- /// in a single batch operation. Bypasses pandas for maximum performance.
+ /// Uses Producer-Consumer pattern: IO thread reads Parquet while GPU
processes data.
+ /// Double-buffered (ping-pong) for maximum pipeline overlap.
///
/// # Arguments
- /// * `path` - Path to Parquet file
+ /// * `path` - Path to Parquet file with List<Float64> column
/// * `num_qubits` - Number of qubits
- /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+ /// * `encoding_method` - Currently only "amplitude" supported for
streaming
///
/// # Returns
- /// Single DLPack pointer containing all encoded states (shape:
[num_samples, 2^num_qubits])
+ /// DLPack pointer to encoded states [num_samples, 2^num_qubits]
pub fn encode_from_parquet(
&self,
path: &str,
@@ -154,14 +172,171 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- // Read Parquet directly using Arrow (faster than pandas)
- let (batch_data, num_samples, sample_size) = {
- crate::profile_scope!("IO::ReadParquetBatch");
- crate::io::read_parquet_batch(path)?
- };
+ #[cfg(target_os = "linux")]
+ {
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented("Only amplitude
encoding supported for streaming".into()));
+ }
- // Encode using fused batch kernel
- self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
+ let mut reader_core = crate::io::ParquetBlockReader::new(path,
None)?;
+ let num_samples = reader_core.total_rows;
+
+ let total_state_vector = GpuStateVector::new_batch(&self.device,
num_samples, num_qubits)?;
+ let ctx = PipelineContext::new(&self.device)?;
+
+ let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+ let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
+ .map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
+
+ let (full_buf_tx, full_buf_rx):
(SyncSender<std::result::Result<(PinnedBuffer, usize), MahoutError>>,
Receiver<std::result::Result<(PinnedBuffer, usize), MahoutError>>) =
sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) = sync_channel(2);
+
+ let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
+
+ let sample_size = reader_core.get_sample_size()
+ .ok_or_else(|| MahoutError::InvalidInput("Could not determine
sample size".into()))?;
+
+ if sample_size == 0 {
+ return Err(MahoutError::InvalidInput("Sample size cannot be
zero".into()));
+ }
+
+ full_buf_tx.send(Ok((host_buf_first, first_len)))
+ .map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
+
+ empty_buf_tx.send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
+
+ let mut reader = reader_core;
+ let io_handle = thread::spawn(move || {
+ loop {
+ let mut buffer = match empty_buf_rx.recv() {
+ Ok(b) => b,
+ Err(_) => break,
+ };
+
+ let result =
reader.read_chunk(buffer.as_slice_mut()).map(|len| (buffer, len));
+
+ let should_break = match &result {
+ Ok((_, len)) => *len == 0,
+ Err(_) => true,
+ };
+
+ if full_buf_tx.send(result).is_err() { break; }
+
+ if should_break { break; }
+ }
+ });
+
+ let mut global_sample_offset: usize = 0;
+ let mut use_dev_a = true;
+ let state_len_per_sample = 1 << num_qubits;
+
+ loop {
+ let (host_buffer, current_len) = match full_buf_rx.recv() {
+ Ok(Ok((buffer, len))) => (buffer, len),
+ Ok(Err(e)) => return Err(e),
+ Err(_) => return Err(MahoutError::Io("IO thread
disconnected".into())),
+ };
+
+ if current_len == 0 { break; }
+
+ if current_len % sample_size != 0 {
+ return Err(MahoutError::InvalidInput(format!(
+ "Chunk length {} is not a multiple of sample size {}",
+ current_len, sample_size
+ )));
+ }
+
+ let samples_in_chunk = current_len / sample_size;
+ if samples_in_chunk > 0 {
+ let dev_ptr = if use_dev_a { *dev_in_a.device_ptr() } else
{ *dev_in_b.device_ptr() };
+
+ unsafe {
+ crate::profile_scope!("GPU::Dispatch");
+
+ ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut
c_void, current_len);
+ ctx.record_copy_done();
+ ctx.wait_for_copy();
+
+ {
+ crate::profile_scope!("GPU::BatchEncode");
+ let offset_elements = global_sample_offset
+ .checked_mul(state_len_per_sample)
+ .ok_or_else(|| MahoutError::MemoryAllocation(
+ format!("Offset calculation overflow: {} *
{}", global_sample_offset, state_len_per_sample)
+ ))?;
+
+ let offset_bytes = offset_elements
+
.checked_mul(std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ .ok_or_else(|| MahoutError::MemoryAllocation(
+ format!("Offset bytes calculation
overflow: {} * {}", offset_elements,
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+ ))?;
+
+ let state_ptr_offset =
total_state_vector.ptr_void().cast::<u8>()
+ .add(offset_bytes)
+ .cast::<std::ffi::c_void>();
+
+ let mut norm_buffer =
self.device.alloc_zeros::<f64>(samples_in_chunk)
+ .map_err(|e|
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}",
e)))?;
+
+ {
+ crate::profile_scope!("GPU::NormBatch");
+ let ret = launch_l2_norm_batch(
+ dev_ptr as *const f64,
+ samples_in_chunk,
+ sample_size,
+ *norm_buffer.device_ptr_mut() as *mut f64,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Norm kernel error: {}", ret)));
+ }
+ }
+
+ {
+ crate::profile_scope!("GPU::EncodeBatch");
+ let ret = launch_amplitude_encode_batch(
+ dev_ptr as *const f64,
+ state_ptr_offset,
+ *norm_buffer.device_ptr() as *const f64,
+ samples_in_chunk,
+ sample_size,
+ state_len_per_sample,
+ ctx.stream_compute.stream as *mut c_void
+ );
+ if ret != 0 {
+ return
Err(MahoutError::KernelLaunch(format!("Encode kernel error: {}", ret)));
+ }
+ }
+ }
+
+ ctx.sync_copy_stream();
+ }
+ global_sample_offset = global_sample_offset
+ .checked_add(samples_in_chunk)
+ .ok_or_else(|| MahoutError::MemoryAllocation(
+ format!("Sample offset overflow: {} + {}",
global_sample_offset, samples_in_chunk)
+ ))?;
+ use_dev_a = !use_dev_a;
+ }
+
+ let _ = empty_buf_tx.send(host_buffer);
+ }
+
+ self.device.synchronize().map_err(|e|
MahoutError::Cuda(format!("{:?}", e)))?;
+ io_handle.join().map_err(|e| MahoutError::Io(format!("IO thread
panicked: {:?}", e)))?;
+
+ let dlpack_ptr = total_state_vector.to_dlpack();
+ Ok(dlpack_ptr)
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ {
+ let (batch_data, num_samples, sample_size) =
crate::io::read_parquet_batch(path)?;
+ self.encode_batch(&batch_data, num_samples, sample_size,
num_qubits, encoding_method)
+ }
}
/// Load data from Arrow IPC file and encode into quantum state
@@ -185,13 +360,11 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromArrowIPC");
- // Read Arrow IPC (6x faster than Parquet)
let (batch_data, num_samples, sample_size) = {
crate::profile_scope!("IO::ReadArrowIPCBatch");
crate::io::read_arrow_ipc_batch(path)?
};
- // Encode using fused batch kernel
self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
}
}
diff --git a/qdp/qdp-core/tests/arrow_ipc_io.rs
b/qdp/qdp-core/tests/arrow_ipc_io.rs
index 6ef206954..1a9289c08 100644
--- a/qdp/qdp-core/tests/arrow_ipc_io.rs
+++ b/qdp/qdp-core/tests/arrow_ipc_io.rs
@@ -14,15 +14,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use qdp_core::io::{read_arrow_ipc_batch, read_parquet_batch};
+use qdp_core::io::read_arrow_ipc_batch;
use arrow::array::{Float64Array, FixedSizeListArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::ipc::writer::FileWriter as ArrowFileWriter;
use std::fs::{self, File};
use std::sync::Arc;
-mod common;
-
#[test]
fn test_read_arrow_ipc_fixed_size_list() {
let temp_path = "/tmp/test_arrow_ipc_fixed.arrow";
diff --git a/qdp/qdp-core/tests/common/mod.rs b/qdp/qdp-core/tests/common/mod.rs
index f105a5436..9afb31e40 100644
--- a/qdp/qdp-core/tests/common/mod.rs
+++ b/qdp/qdp-core/tests/common/mod.rs
@@ -14,7 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-/// Create test data with normalized values
+/// Creates normalized test data
+#[allow(dead_code)] // Used by multiple test modules
pub fn create_test_data(size: usize) -> Vec<f64> {
(0..size).map(|i| (i as f64) / (size as f64)).collect()
}