This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new 6ca875d56 [QDP] Double-buffered pinned I/O pipeline and faster Parquet
decode (#751)
6ca875d56 is described below
commit 6ca875d562e80a0e54a82083e8d2fcd7c27281cb
Author: Ping <[email protected]>
AuthorDate: Mon Jan 5 01:48:14 2026 +0800
[QDP] Double-buffered pinned I/O pipeline and faster Parquet decode (#751)
* Double-buffered async I/O for read_parquet_batch
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* fix python binding error
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* fix build error
Signed-off-by: 400Ping <[email protected]>
* Revert "fix build error"
This reverts commit 3556b5ad72418d9f31c9449b03df7682fd8018d9.
* fix build errors
Signed-off-by: 400Ping <[email protected]>
* update unit test and boundary check
Signed-off-by: 400Ping <[email protected]>
* remove improvement 2
Signed-off-by: 400Ping <[email protected]>
* fix qdp-core error
Signed-off-by: 400Ping <[email protected]>
* fix pre-commit
Signed-off-by: 400Ping <[email protected]>
* [Fix] fix pre-commit errors & warnings
Signed-off-by: 400Ping <[email protected]>
* fix rust linters
Signed-off-by: 400Ping <[email protected]>
* [Fix] handle buffer pool lock poisoning
Signed-off-by: 400Ping <[email protected]>
* [Chore] fix rust linters
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
* Remove unused func
Signed-off-by: 400Ping <[email protected]>
* update
Signed-off-by: 400Ping <[email protected]>
---------
Signed-off-by: 400Ping <[email protected]>
---
qdp/qdp-core/src/gpu/buffer_pool.rs | 151 +++++++++++++++++
qdp/qdp-core/src/gpu/memory.rs | 21 ++-
qdp/qdp-core/src/gpu/mod.rs | 4 +
qdp/qdp-core/src/gpu/pipeline.rs | 270 ++++++++++++++++++------------
qdp/qdp-core/src/lib.rs | 59 +++----
qdp/qdp-kernels/tests/amplitude_encode.rs | 4 +-
6 files changed, 360 insertions(+), 149 deletions(-)
diff --git a/qdp/qdp-core/src/gpu/buffer_pool.rs
b/qdp/qdp-core/src/gpu/buffer_pool.rs
new file mode 100644
index 000000000..6604594be
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/buffer_pool.rs
@@ -0,0 +1,151 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Reusable pool of pinned host buffers for staging Disk → Host → GPU
transfers.
+//! Intended for producer/consumer pipelines that need a small, fixed set of
+//! page-locked buffers to avoid repeated cudaHostAlloc / cudaFreeHost.
+
+use std::sync::{Arc, Condvar, Mutex, MutexGuard};
+
+use crate::error::{MahoutError, Result};
+use crate::gpu::memory::PinnedHostBuffer;
+
+/// Handle that automatically returns a buffer to the pool on drop.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferHandle {
+ buffer: Option<PinnedHostBuffer>,
+ pool: Arc<PinnedBufferPool>,
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::Deref for PinnedBufferHandle {
+ type Target = PinnedHostBuffer;
+
+ fn deref(&self) -> &Self::Target {
+ self.buffer
+ .as_ref()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl std::ops::DerefMut for PinnedBufferHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buffer
+ .as_mut()
+ .expect("Buffer already returned to pool")
+ }
+}
+
+#[cfg(target_os = "linux")]
+impl Drop for PinnedBufferHandle {
+ fn drop(&mut self) {
+ if let Some(buf) = self.buffer.take() {
+ let mut free = self.pool.lock_free();
+ free.push(buf);
+ self.pool.available_cv.notify_one();
+ }
+ }
+}
+
+/// Pool of pinned host buffers sized for a fixed batch shape.
+#[cfg(target_os = "linux")]
+pub struct PinnedBufferPool {
+ free: Mutex<Vec<PinnedHostBuffer>>,
+ available_cv: Condvar,
+ capacity: usize,
+ elements_per_buffer: usize,
+}
+
+#[cfg(target_os = "linux")]
+impl PinnedBufferPool {
+ /// Create a pool with `pool_size` pinned buffers, each sized for
`elements_per_buffer` f64 values.
+ pub fn new(pool_size: usize, elements_per_buffer: usize) ->
Result<Arc<Self>> {
+ if pool_size == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool requires at least one buffer".to_string(),
+ ));
+ }
+ if elements_per_buffer == 0 {
+ return Err(MahoutError::InvalidInput(
+ "PinnedBufferPool buffer size must be greater than
zero".to_string(),
+ ));
+ }
+
+ let mut buffers = Vec::with_capacity(pool_size);
+ for _ in 0..pool_size {
+ buffers.push(PinnedHostBuffer::new(elements_per_buffer)?);
+ }
+
+ Ok(Arc::new(Self {
+ free: Mutex::new(buffers),
+ available_cv: Condvar::new(),
+ capacity: pool_size,
+ elements_per_buffer,
+ }))
+ }
+
+ fn lock_free(&self) -> MutexGuard<'_, Vec<PinnedHostBuffer>> {
+ // Ignore poisoning to keep the pool usable after a panic elsewhere.
+ self.free
+ .lock()
+ .unwrap_or_else(|poisoned| poisoned.into_inner())
+ }
+
+ /// Acquire a pinned buffer, blocking until one is available.
+ pub fn acquire(self: &Arc<Self>) -> PinnedBufferHandle {
+ let mut free = self.lock_free();
+ loop {
+ if let Some(buffer) = free.pop() {
+ return PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ };
+ }
+ free = self
+ .available_cv
+ .wait(free)
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ }
+ }
+
+ /// Try to acquire a pinned buffer from the pool.
+ ///
+ /// Returns `None` if the pool is currently empty; callers can choose to
spin/wait
+ /// or fall back to synchronous paths.
+ pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> {
+ let mut free = self.lock_free();
+ free.pop().map(|buffer| PinnedBufferHandle {
+ buffer: Some(buffer),
+ pool: Arc::clone(self),
+ })
+ }
+
+ /// Number of buffers currently available.
+ pub fn available(&self) -> usize {
+ self.lock_free().len()
+ }
+
+ /// Total number of buffers managed by this pool.
+ pub fn capacity(&self) -> usize {
+ self.capacity
+ }
+
+ /// Fixed element capacity for each buffer in the pool.
+ pub fn elements_per_buffer(&self) -> usize {
+ self.elements_per_buffer
+ }
+}
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 5944c8671..07ec86583 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -17,6 +17,7 @@ use crate::error::{MahoutError, Result};
use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr};
use qdp_kernels::{CuComplex, CuDoubleComplex};
use std::ffi::c_void;
+#[cfg(target_os = "linux")]
use std::sync::Arc;
/// Precision of the GPU state vector.
@@ -205,6 +206,7 @@ pub struct GpuStateVector {
pub size_elements: usize,
/// Batch size (None for single state)
pub(crate) num_samples: Option<usize>,
+ /// CUDA device ordinal
pub device_id: usize,
}
@@ -446,17 +448,17 @@ impl GpuStateVector {
// === Pinned Memory Implementation ===
-/// Pinned Host Memory Buffer (Page-Locked)
+/// Pinned Host Memory Buffer (owned allocation).
///
-/// Enables DMA for H2D copies, doubling bandwidth and reducing CPU usage.
+/// Allocates page-locked memory to maximize H2D throughput in streaming IO
paths.
#[cfg(target_os = "linux")]
-pub struct PinnedBuffer {
+pub struct PinnedHostBuffer {
ptr: *mut f64,
size_elements: usize,
}
#[cfg(target_os = "linux")]
-impl PinnedBuffer {
+impl PinnedHostBuffer {
/// Allocate pinned memory
pub fn new(elements: usize) -> Result<Self> {
unsafe {
@@ -491,6 +493,11 @@ impl PinnedBuffer {
unsafe { std::slice::from_raw_parts_mut(self.ptr, self.size_elements) }
}
+ /// Immutable slice view of the pinned region
+ pub fn as_slice(&self) -> &[f64] {
+ unsafe { std::slice::from_raw_parts(self.ptr, self.size_elements) }
+ }
+
/// Get raw pointer for CUDA memcpy
pub fn ptr(&self) -> *const f64 {
self.ptr
@@ -506,7 +513,7 @@ impl PinnedBuffer {
}
#[cfg(target_os = "linux")]
-impl Drop for PinnedBuffer {
+impl Drop for PinnedHostBuffer {
fn drop(&mut self) {
unsafe {
let result = cudaFreeHost(self.ptr as *mut c_void);
@@ -523,7 +530,7 @@ impl Drop for PinnedBuffer {
// Safety: Pinned memory is accessible from any thread
#[cfg(target_os = "linux")]
-unsafe impl Send for PinnedBuffer {}
+unsafe impl Send for PinnedHostBuffer {}
#[cfg(target_os = "linux")]
-unsafe impl Sync for PinnedBuffer {}
+unsafe impl Sync for PinnedHostBuffer {}
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index c42fe1afe..451da1498 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -14,6 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#[cfg(target_os = "linux")]
+pub mod buffer_pool;
pub mod encodings;
pub mod memory;
pub mod pipeline;
@@ -21,6 +23,8 @@ pub mod pipeline;
#[cfg(target_os = "linux")]
pub(crate) mod cuda_ffi;
+#[cfg(target_os = "linux")]
+pub use buffer_pool::{PinnedBufferHandle, PinnedBufferPool};
pub use encodings::{AmplitudeEncoder, AngleEncoder, BasisEncoder,
QuantumEncoder, get_encoder};
pub use memory::GpuStateVector;
pub use pipeline::run_dual_stream_pipeline;
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index 8c72bf3fa..5acb7d32b 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -21,118 +21,177 @@
use crate::error::{MahoutError, Result};
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{PinnedBuffer, ensure_device_memory_available,
map_allocation_error};
-use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
-use std::ffi::c_void;
-use std::sync::Arc;
-
+use crate::gpu::buffer_pool::{PinnedBufferHandle, PinnedBufferPool};
#[cfg(target_os = "linux")]
use crate::gpu::cuda_ffi::{
CUDA_EVENT_DISABLE_TIMING, CUDA_MEMCPY_HOST_TO_DEVICE,
cudaEventCreateWithFlags,
cudaEventDestroy, cudaEventRecord, cudaMemcpyAsync, cudaStreamSynchronize,
cudaStreamWaitEvent,
};
+#[cfg(target_os = "linux")]
+use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
+use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
+use std::ffi::c_void;
+use std::sync::Arc;
-/// Dual-stream pipeline context: manages compute/copy streams and sync events
+/// Dual-stream context coordinating copy/compute with an event.
#[cfg(target_os = "linux")]
pub struct PipelineContext {
pub stream_compute: CudaStream,
pub stream_copy: CudaStream,
- event_copy_done: *mut c_void,
+ events_copy_done: Vec<*mut c_void>,
+}
+
+#[cfg(target_os = "linux")]
+fn validate_event_slot(events: &[*mut c_void], slot: usize) -> Result<()> {
+ if slot >= events.len() {
+ return Err(MahoutError::InvalidInput(format!(
+ "Event slot {} out of range (max: {})",
+ slot,
+ events.len().saturating_sub(1)
+ )));
+ }
+ Ok(())
}
#[cfg(target_os = "linux")]
+#[allow(unsafe_op_in_unsafe_fn)]
impl PipelineContext {
- /// Create dual streams and sync event
- pub fn new(device: &Arc<CudaDevice>) -> Result<Self> {
+ pub fn new(device: &Arc<CudaDevice>, event_slots: usize) -> Result<Self> {
let stream_compute = device
.fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create compute
stream: {:?}", e)))?;
let stream_copy = device
.fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("{:?}", e)))?;
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create copy
stream: {:?}", e)))?;
- let mut event_copy_done: *mut c_void = std::ptr::null_mut();
- unsafe {
- let ret = cudaEventCreateWithFlags(&mut event_copy_done,
CUDA_EVENT_DISABLE_TIMING);
- if ret != 0 {
- return Err(MahoutError::Cuda(format!(
- "Failed to create CUDA event: {}",
- ret
- )));
+ let mut events_copy_done = Vec::with_capacity(event_slots);
+ for _ in 0..event_slots {
+ let mut ev: *mut c_void = std::ptr::null_mut();
+ unsafe {
+ let ret = cudaEventCreateWithFlags(&mut ev,
CUDA_EVENT_DISABLE_TIMING);
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "Failed to create CUDA event: {}",
+ ret
+ )));
+ }
}
+ events_copy_done.push(ev);
}
Ok(Self {
stream_compute,
stream_copy,
- event_copy_done,
+ events_copy_done,
})
}
- /// Async H2D copy on copy stream
+ /// Async H2D copy on the copy stream.
///
/// # Safety
- ///
- /// The caller must ensure that:
- /// - `dst` points to valid device memory of at least `len_elements *
sizeof(f64)` bytes
- /// - `src` is a valid pinned buffer with at least `len_elements` elements
- /// - The memory regions do not overlap in an undefined way
- /// - The CUDA stream is valid and properly initialized
+ /// `src` must be valid for `len_elements` `f64` values and properly
aligned.
+ /// `dst` must point to device memory for `len_elements` `f64` values on
the same device.
+ /// Both pointers must remain valid until the copy completes on
`stream_copy`.
pub unsafe fn async_copy_to_device(
&self,
- src: &PinnedBuffer,
+ src: *const c_void,
dst: *mut c_void,
len_elements: usize,
- ) {
+ ) -> Result<()> {
crate::profile_scope!("GPU::H2D_Copy");
- unsafe {
- cudaMemcpyAsync(
- dst,
- src.ptr() as *const c_void,
- len_elements * std::mem::size_of::<f64>(),
- CUDA_MEMCPY_HOST_TO_DEVICE,
- self.stream_copy.stream as *mut c_void,
- );
+ let ret = cudaMemcpyAsync(
+ dst,
+ src,
+ len_elements * std::mem::size_of::<f64>(),
+ CUDA_MEMCPY_HOST_TO_DEVICE,
+ self.stream_copy.stream as *mut c_void,
+ );
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "Async H2D copy failed with CUDA error: {}",
+ ret
+ )));
}
+ Ok(())
}
- /// Record copy completion event
+ /// Record completion of the copy on the copy stream.
///
/// # Safety
- ///
- /// The caller must ensure that the CUDA event and stream are valid and
properly initialized.
- pub unsafe fn record_copy_done(&self) {
- unsafe {
- cudaEventRecord(self.event_copy_done, self.stream_copy.stream as
*mut c_void);
+ /// `slot` must refer to a live event created by this context, and the
context must
+ /// remain alive until the event is no longer used by any stream.
+ pub unsafe fn record_copy_done(&self, slot: usize) -> Result<()> {
+ validate_event_slot(&self.events_copy_done, slot)?;
+
+ let ret = cudaEventRecord(
+ self.events_copy_done[slot],
+ self.stream_copy.stream as *mut c_void,
+ );
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "cudaEventRecord failed: {}",
+ ret
+ )));
}
+ Ok(())
}
- /// Make compute stream wait for copy completion
+ /// Make compute stream wait for the copy completion event.
///
/// # Safety
- ///
- /// The caller must ensure that the compute stream and copy event are
valid and properly initialized.
- pub unsafe fn wait_for_copy(&self) {
+ /// `slot` must refer to a live event previously recorded on
`stream_copy`, and the
+ /// context and its streams must remain valid while waiting.
+ pub unsafe fn wait_for_copy(&self, slot: usize) -> Result<()> {
crate::profile_scope!("GPU::StreamWait");
- unsafe {
- cudaStreamWaitEvent(
- self.stream_compute.stream as *mut c_void,
- self.event_copy_done,
- 0,
- );
+ validate_event_slot(&self.events_copy_done, slot)?;
+
+ let ret = cudaStreamWaitEvent(
+ self.stream_compute.stream as *mut c_void,
+ self.events_copy_done[slot],
+ 0,
+ );
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "cudaStreamWaitEvent failed: {}",
+ ret
+ )));
}
+ Ok(())
}
- /// Sync copy stream (safe to reuse host buffer)
+ /// Sync copy stream (safe to reuse host buffer).
///
/// # Safety
- ///
- /// The caller must ensure that the copy stream is valid and properly
initialized.
- pub unsafe fn sync_copy_stream(&self) {
+ /// The context and its copy stream must be valid and not destroyed while
syncing.
+ pub unsafe fn sync_copy_stream(&self) -> Result<()> {
crate::profile_scope!("Pipeline::SyncCopy");
- unsafe {
- cudaStreamSynchronize(self.stream_copy.stream as *mut c_void);
+ let ret = cudaStreamSynchronize(self.stream_copy.stream as *mut
c_void);
+ if ret != 0 {
+ return Err(MahoutError::Cuda(format!(
+ "cudaStreamSynchronize(copy) failed: {}",
+ ret
+ )));
}
+ Ok(())
+ }
+}
+
+#[cfg(all(test, target_os = "linux"))]
+mod tests {
+ use super::validate_event_slot;
+
+ #[test]
+ fn validate_event_slot_allows_in_range() {
+ let events = vec![std::ptr::null_mut(); 2];
+ assert!(validate_event_slot(&events, 0).is_ok());
+ assert!(validate_event_slot(&events, 1).is_ok());
+ }
+
+ #[test]
+ fn validate_event_slot_rejects_out_of_range() {
+ let events = vec![std::ptr::null_mut(); 2];
+ let err = validate_event_slot(&events, 2).unwrap_err();
+ assert!(matches!(err, crate::error::MahoutError::InvalidInput(_)));
}
}
@@ -140,22 +199,15 @@ impl PipelineContext {
impl Drop for PipelineContext {
fn drop(&mut self) {
unsafe {
- if !self.event_copy_done.is_null() {
- cudaEventDestroy(self.event_copy_done);
+ for ev in &mut self.events_copy_done {
+ if !ev.is_null() {
+ let _ = cudaEventDestroy(*ev);
+ }
}
}
}
}
-/// Chunk processing callback for async pipeline
-///
-/// This closure is called for each chunk with:
-/// - `stream`: The CUDA stream to launch the kernel on
-/// - `input_ptr`: Device pointer to the chunk data (already copied)
-/// - `chunk_offset`: Global offset in the original data (in elements)
-/// - `chunk_len`: Length of this chunk (in elements)
-pub type ChunkProcessor = dyn FnMut(&CudaStream, *const f64, usize, usize) ->
Result<()>;
-
/// Executes a task using dual-stream double-buffering pattern
///
/// This function handles the generic pipeline mechanics:
@@ -191,29 +243,29 @@ where
{
crate::profile_scope!("GPU::AsyncPipeline");
- // 1. Create dual streams for pipeline overlap
- let stream1 = device
- .fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1:
{:?}", e)))?;
- let stream2 = device
- .fork_default_stream()
- .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2:
{:?}", e)))?;
- let streams = [&stream1, &stream2];
+ // Pinned host staging pool sized to the current chunking strategy
(double-buffer by default).
+ const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ const PINNED_POOL_SIZE: usize = 2; // double buffering
+ // 1. Create dual streams with per-slot events to coordinate copy ->
compute
+ let ctx = PipelineContext::new(device, PINNED_POOL_SIZE)?;
+ let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE,
CHUNK_SIZE_ELEMENTS)
+ .map_err(|e| MahoutError::Cuda(format!("Failed to create pinned buffer
pool: {}", e)))?;
// 2. Chunk size: 8MB per chunk (balance between overhead and overlap
opportunity)
- // TODO: we should tune this dynamically based on the detected GPU model
or PCIe bandwidth in the future.
- // Too small = launch overhead dominates, too large = less overlap
- const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 /
std::mem::size_of::<f64>(); // 8MB
+ // TODO: tune dynamically based on GPU/PCIe bandwidth.
// 3. Keep temporary buffers alive until all streams complete
// This prevents Rust from dropping them while GPU is still using them
let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+ // Keep pinned buffers alive until the copy stream has completed their H2D
copy
+ let mut in_flight_pinned: Vec<PinnedBufferHandle> = Vec::new();
let mut global_offset = 0;
- // 4. Pipeline loop: alternate between streams for maximum overlap
+ // 4. Pipeline loop: copy on copy stream, compute on compute stream with
event handoff
for (chunk_idx, chunk) in
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
- let current_stream = streams[chunk_idx % 2];
+ let chunk_offset = global_offset;
+ let event_slot = chunk_idx % PINNED_POOL_SIZE;
crate::profile_scope!("GPU::ChunkProcess");
@@ -225,31 +277,33 @@ where
map_allocation_error(chunk_bytes, "pipeline chunk buffer
allocation", None, e)
})?;
+ // Acquire pinned staging buffer and populate it with the current chunk
+ let mut pinned_buf = pinned_pool.acquire();
+ pinned_buf.as_slice_mut()[..chunk.len()].copy_from_slice(chunk);
+
// Async copy: host to device (non-blocking, on specified stream)
// Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
{
crate::profile_scope!("GPU::H2DCopyAsync");
unsafe {
- let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut
c_void;
- let src_host_ptr = chunk.as_ptr() as *const c_void;
- let bytes = std::mem::size_of_val(chunk);
- let stream_handle = current_stream.stream as *mut c_void;
-
- let result = cudaMemcpyAsync(
- dst_device_ptr,
- src_host_ptr,
- bytes,
- CUDA_MEMCPY_HOST_TO_DEVICE,
- stream_handle,
- );
-
- if result != 0 {
- return Err(MahoutError::Cuda(format!(
- "Async H2D copy failed with CUDA error: {}",
- result
- )));
- }
+ ctx.async_copy_to_device(
+ pinned_buf.ptr() as *const c_void,
+ *input_chunk_dev.device_ptr() as *mut c_void,
+ chunk.len(),
+ )?;
+ ctx.record_copy_done(event_slot)?;
+ ctx.wait_for_copy(event_slot)?;
+ }
+ }
+
+ // Keep pinned buffer alive until the copy stream is synchronized.
+ in_flight_pinned.push(pinned_buf);
+ if in_flight_pinned.len() == PINNED_POOL_SIZE {
+ // Ensure previous H2D copies are done before reusing buffers.
+ unsafe {
+ ctx.sync_copy_stream()?;
}
+ in_flight_pinned.clear();
}
// Get device pointer for kernel launch
@@ -258,7 +312,7 @@ where
// Invoke caller's kernel launcher (non-blocking)
{
crate::profile_scope!("GPU::KernelLaunchAsync");
- kernel_launcher(current_stream, input_ptr, global_offset,
chunk.len())?;
+ kernel_launcher(&ctx.stream_compute, input_ptr, chunk_offset,
chunk.len())?;
}
// Keep buffer alive until synchronization
@@ -274,12 +328,12 @@ where
// This ensures all async copies and kernel launches have finished
{
crate::profile_scope!("GPU::StreamSync");
+ unsafe {
+ ctx.sync_copy_stream()?;
+ }
device
- .wait_for(&stream1)
- .map_err(|e| MahoutError::Cuda(format!("Stream 1 sync failed:
{:?}", e)))?;
- device
- .wait_for(&stream2)
- .map_err(|e| MahoutError::Cuda(format!("Stream 2 sync failed:
{:?}", e)))?;
+ .wait_for(&ctx.stream_compute)
+ .map_err(|e| MahoutError::Cuda(format!("Compute stream sync
failed: {:?}", e)))?;
}
// Buffers are dropped here (after sync), freeing GPU memory
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index b8ff42fc6..c1030b809 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -35,17 +35,12 @@ use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
#[cfg(target_os = "linux")]
use std::thread;
-#[cfg(target_os = "linux")]
-type BufferResult = std::result::Result<(PinnedBuffer, usize), MahoutError>;
-#[cfg(target_os = "linux")]
-type BufferChannels = (SyncSender<BufferResult>, Receiver<BufferResult>);
-
use crate::dlpack::DLManagedTensor;
#[cfg(target_os = "linux")]
use crate::gpu::PipelineContext;
use crate::gpu::get_encoder;
#[cfg(target_os = "linux")]
-use crate::gpu::memory::{GpuStateVector, PinnedBuffer};
+use crate::gpu::memory::{GpuStateVector, PinnedHostBuffer};
#[cfg(target_os = "linux")]
use crate::reader::StreamingDataReader;
use cudarc::driver::{CudaDevice, DevicePtr, DevicePtrMut};
@@ -57,6 +52,10 @@ use qdp_kernels::{launch_amplitude_encode_batch,
launch_l2_norm_batch};
const STAGE_SIZE_BYTES: usize = 512 * 1024 * 1024;
#[cfg(target_os = "linux")]
const STAGE_SIZE_ELEMENTS: usize = STAGE_SIZE_BYTES /
std::mem::size_of::<f64>();
+#[cfg(target_os = "linux")]
+type FullBufferResult = std::result::Result<(PinnedHostBuffer, usize),
MahoutError>;
+#[cfg(target_os = "linux")]
+type FullBufferChannel = (SyncSender<FullBufferResult>,
Receiver<FullBufferResult>);
/// Main entry point for Mahout QDP
///
@@ -100,7 +99,7 @@ impl QdpEngine {
/// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
///
/// # Returns
- /// DLPack pointer for zero-copy PyTorch integration (shape: [1,
2^num_qubits])
+ /// DLPack pointer for zero-copy PyTorch integration
///
/// # Safety
/// Pointer freed by DLPack deleter, do not free manually.
@@ -198,18 +197,21 @@ impl QdpEngine {
let total_state_vector =
GpuStateVector::new_batch(&self.device, num_samples,
num_qubits)?;
- let ctx = PipelineContext::new(&self.device)?;
+ const PIPELINE_EVENT_SLOTS: usize = 2; // matches double-buffered
staging buffers
+ let ctx = PipelineContext::new(&self.device,
PIPELINE_EVENT_SLOTS)?;
let dev_in_a = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
.map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
let dev_in_b = unsafe {
self.device.alloc::<f64>(STAGE_SIZE_ELEMENTS) }
.map_err(|e| MahoutError::MemoryAllocation(format!("{:?}",
e)))?;
- let (full_buf_tx, full_buf_rx): BufferChannels = sync_channel(2);
- let (empty_buf_tx, empty_buf_rx): (SyncSender<PinnedBuffer>,
Receiver<PinnedBuffer>) =
- sync_channel(2);
+ let (full_buf_tx, full_buf_rx): FullBufferChannel =
sync_channel(2);
+ let (empty_buf_tx, empty_buf_rx): (
+ SyncSender<PinnedHostBuffer>,
+ Receiver<PinnedHostBuffer>,
+ ) = sync_channel(2);
- let mut host_buf_first = PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?;
+ let mut host_buf_first =
PinnedHostBuffer::new(STAGE_SIZE_ELEMENTS)?;
let first_len =
reader_core.read_chunk(host_buf_first.as_slice_mut())?;
let sample_size = reader_core.get_sample_size().ok_or_else(|| {
@@ -223,23 +225,15 @@ impl QdpEngine {
}
if sample_size > STAGE_SIZE_ELEMENTS {
return Err(MahoutError::InvalidInput(format!(
- "Sample size {} exceeds staging buffer capacity {}
(elements)",
+ "Sample size {} exceeds staging buffer capacity {}",
sample_size, STAGE_SIZE_ELEMENTS
)));
}
- // Reuse a single norm buffer across chunks to avoid per-chunk
allocations.
- //
- // Important: the norm buffer must outlive the async kernels that
consume it.
- // Per-chunk allocation + drop can lead to use-after-free when the
next chunk
- // reuses the same device memory while the previous chunk is still
running.
- let max_samples_per_chunk = std::cmp::max(
- 1,
- std::cmp::min(num_samples, STAGE_SIZE_ELEMENTS / sample_size),
- );
+ let max_samples_in_chunk = STAGE_SIZE_ELEMENTS / sample_size;
let mut norm_buffer = self
.device
- .alloc_zeros::<f64>(max_samples_per_chunk)
+ .alloc_zeros::<f64>(max_samples_in_chunk)
.map_err(|e| {
MahoutError::MemoryAllocation(format!(
"Failed to allocate norm buffer: {:?}",
@@ -252,7 +246,7 @@ impl QdpEngine {
.map_err(|_| MahoutError::Io("Failed to send first
buffer".into()))?;
empty_buf_tx
- .send(PinnedBuffer::new(STAGE_SIZE_ELEMENTS)?)
+ .send(PinnedHostBuffer::new(STAGE_SIZE_ELEMENTS)?)
.map_err(|_| MahoutError::Io("Failed to send second
buffer".into()))?;
let mut reader = reader_core;
@@ -306,6 +300,7 @@ impl QdpEngine {
let samples_in_chunk = current_len / sample_size;
if samples_in_chunk > 0 {
+ let event_slot = if use_dev_a { 0 } else { 1 };
let dev_ptr = if use_dev_a {
*dev_in_a.device_ptr()
} else {
@@ -315,9 +310,13 @@ impl QdpEngine {
unsafe {
crate::profile_scope!("GPU::Dispatch");
- ctx.async_copy_to_device(&host_buffer, dev_ptr as *mut
c_void, current_len);
- ctx.record_copy_done();
- ctx.wait_for_copy();
+ ctx.async_copy_to_device(
+ host_buffer.ptr() as *const c_void,
+ dev_ptr as *mut c_void,
+ current_len,
+ )?;
+ ctx.record_copy_done(event_slot)?;
+ ctx.wait_for_copy(event_slot)?;
{
crate::profile_scope!("GPU::BatchEncode");
@@ -345,10 +344,6 @@ impl QdpEngine {
.cast::<u8>()
.add(offset_bytes)
.cast::<std::ffi::c_void>();
- debug_assert!(
- samples_in_chunk <= max_samples_per_chunk,
- "samples_in_chunk must be <=
max_samples_per_chunk"
- );
{
crate::profile_scope!("GPU::NormBatch");
@@ -387,7 +382,7 @@ impl QdpEngine {
}
}
- ctx.sync_copy_stream();
+ ctx.sync_copy_stream()?;
}
global_sample_offset = global_sample_offset
.checked_add(samples_in_chunk)
diff --git a/qdp/qdp-kernels/tests/amplitude_encode.rs
b/qdp/qdp-kernels/tests/amplitude_encode.rs
index f86e00fcb..0d69ca9ee 100644
--- a/qdp/qdp-kernels/tests/amplitude_encode.rs
+++ b/qdp/qdp-kernels/tests/amplitude_encode.rs
@@ -509,9 +509,9 @@ fn test_amplitude_encode_small_input_large_state() {
assert!((state_h[1].x - 0.8).abs() < EPSILON);
// Rest should be zero
- for (i, item) in state_h.iter().enumerate().take(state_len).skip(2) {
+ for (i, value) in state_h.iter().enumerate().skip(2) {
assert!(
- item.x.abs() < EPSILON && item.y.abs() < EPSILON,
+ value.x.abs() < EPSILON && value.y.abs() < EPSILON,
"Element {} should be zero-padded",
i
);