This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit 4fb7065f05bad8d13dfa0afae7560e44fba61a2b Author: KUAN-HAO HUANG <[email protected]> AuthorDate: Thu Dec 4 15:58:03 2025 +0800 [QDP] make dataflow async and improve amplitudeEncoder (#675) * async pipline * prevent error * Add TODO for dynamic chunk size tuning Added a TODO comment to tune chunk size dynamically based on GPU model or PCIe bandwidth. * update * add a test --- qdp/qdp-core/src/gpu/encodings/amplitude.rs | 122 ++++++++++++++++++- qdp/qdp-core/src/gpu/memory.rs | 2 + qdp/qdp-core/src/gpu/mod.rs | 2 + qdp/qdp-core/src/gpu/pipeline.rs | 174 ++++++++++++++++++++++++++++ qdp/qdp-core/tests/api_workflow.rs | 35 ++++++ qdp/qdp-kernels/build.rs | 2 - qdp/qdp-kernels/src/amplitude.cu | 4 +- qdp/qdp-kernels/src/lib.rs | 8 +- qdp/qdp-python/pyproject.toml | 2 +- qdp/qdp-python/src/lib.rs | 11 +- qdp/qdp-python/uv.lock | 2 +- 11 files changed, 345 insertions(+), 19 deletions(-) diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs b/qdp/qdp-core/src/gpu/encodings/amplitude.rs index 1709eab77..38551d15c 100644 --- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs +++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use cudarc::driver::CudaDevice; use crate::error::{MahoutError, Result}; use crate::gpu::memory::GpuStateVector; +use crate::gpu::pipeline::run_dual_stream_pipeline; use super::QuantumEncoder; #[cfg(target_os = "linux")] @@ -57,23 +58,27 @@ impl QuantumEncoder for AmplitudeEncoder { GpuStateVector::new(_device, num_qubits)? }; - // Copy input data to GPU (synchronous, zero-copy from slice) - // TODO : Use async CUDA streams for pipeline overlap + // SSS-Tier Optimization: Async Pipeline for large data + // For small data (< 1MB), use synchronous path to avoid stream overhead + // For large data, use dual-stream async pipeline for maximum throughput + const ASYNC_THRESHOLD: usize = 1024 * 1024 / std::mem::size_of::<f64>(); // 1MB threshold + + if host_data.len() < ASYNC_THRESHOLD { + // Synchronous path for small data (avoids stream overhead) let input_slice = { crate::profile_scope!("GPU::H2DCopy"); _device.htod_sync_copy(host_data) .map_err(|e| MahoutError::MemoryAllocation(format!("Failed to allocate input buffer: {:?}", e)))? }; - // Launch CUDA kernel (CPU-side launch only; execution is asynchronous) let ret = { crate::profile_scope!("GPU::KernelLaunch"); unsafe { launch_amplitude_encode( *input_slice.device_ptr() as *const f64, state_vector.ptr() as *mut c_void, - host_data.len() as i32, - state_len as i32, + host_data.len(), + state_len, norm, std::ptr::null_mut(), // default stream ) @@ -89,12 +94,15 @@ impl QuantumEncoder for AmplitudeEncoder { return Err(MahoutError::KernelLaunch(error_msg)); } - // Block until all work on the device is complete { crate::profile_scope!("GPU::Synchronize"); _device .synchronize() .map_err(|e| MahoutError::Cuda(format!("CUDA device synchronize failed: {:?}", e)))?; + } + } else { + // Async Pipeline path for large data + Self::encode_async_pipeline(_device, host_data, num_qubits, state_len, norm, &state_vector)?; } Ok(state_vector) @@ -115,6 +123,108 @@ impl QuantumEncoder for AmplitudeEncoder { } } +impl AmplitudeEncoder { + /// Async pipeline encoding for large data (SSS-tier optimization) + /// + /// Uses the generic dual-stream pipeline infrastructure to overlap + /// data transfer and computation. The pipeline handles all the + /// streaming mechanics, while this method focuses on the amplitude + /// encoding kernel logic. + #[cfg(target_os = "linux")] + fn encode_async_pipeline( + device: &Arc<CudaDevice>, + host_data: &[f64], + _num_qubits: usize, + state_len: usize, + norm: f64, + state_vector: &GpuStateVector, + ) -> Result<()> { + // Use generic pipeline infrastructure + // The closure handles amplitude-specific kernel launch logic + run_dual_stream_pipeline(device, host_data, |stream, input_ptr, chunk_offset, chunk_len| { + // Calculate offset pointer for state vector (type-safe pointer arithmetic) + // Offset is in complex numbers (CuDoubleComplex), not f64 elements + let state_ptr_offset = unsafe { + state_vector.ptr().cast::<u8>() + .add(chunk_offset * std::mem::size_of::<qdp_kernels::CuDoubleComplex>()) + .cast::<std::ffi::c_void>() + }; + + // Launch amplitude encoding kernel on the provided stream + let ret = unsafe { + launch_amplitude_encode( + input_ptr, + state_ptr_offset, + chunk_len, + state_len, + norm, + stream.stream as *mut c_void, + ) + }; + + if ret != 0 { + let error_msg = format!( + "Kernel launch failed with CUDA error code: {} ({})", + ret, + cuda_error_to_string(ret) + ); + return Err(MahoutError::KernelLaunch(error_msg)); + } + + Ok(()) + })?; + + // CRITICAL FIX: Handle padding for uninitialized memory + // Since we use alloc() (uninitialized), we must zero-fill any tail region + // that wasn't written by the pipeline. This ensures correctness when + // host_data.len() < state_len (e.g., 1000 elements in a 1024-element state). + let data_len = host_data.len(); + if data_len < state_len { + let padding_start = data_len; + let padding_elements = state_len - padding_start; + let padding_bytes = padding_elements * std::mem::size_of::<qdp_kernels::CuDoubleComplex>(); + + // Calculate tail pointer (in complex numbers) + let tail_ptr = unsafe { + state_vector.ptr().add(padding_start) as *mut c_void + }; + + // Zero-fill padding region using CUDA Runtime API + // Use default stream since pipeline streams are already synchronized + unsafe { + unsafe extern "C" { + fn cudaMemsetAsync( + devPtr: *mut c_void, + value: i32, + count: usize, + stream: *mut c_void, + ) -> i32; + } + + let result = cudaMemsetAsync( + tail_ptr, + 0, + padding_bytes, + std::ptr::null_mut(), // default stream + ); + + if result != 0 { + return Err(MahoutError::Cuda( + format!("Failed to zero-fill padding region: {} ({})", + result, cuda_error_to_string(result)) + )); + } + } + + // Synchronize to ensure padding is complete before returning + device.synchronize() + .map_err(|e| MahoutError::Cuda(format!("Failed to sync after padding: {:?}", e)))?; + } + + Ok(()) + } +} + /// Convert CUDA error code to human-readable string #[cfg(target_os = "linux")] fn cuda_error_to_string(code: i32) -> &'static str { diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs index 49f26602a..513c326c0 100644 --- a/qdp/qdp-core/src/gpu/memory.rs +++ b/qdp/qdp-core/src/gpu/memory.rs @@ -60,6 +60,8 @@ impl GpuStateVector { #[cfg(target_os = "linux")] { // Use uninitialized allocation to avoid memory bandwidth waste. + // TODO: Consider using a memory pool for input buffers to avoid repeated + // cudaMalloc overhead in high-frequency encode() calls. let slice = unsafe { _device.alloc::<CuDoubleComplex>(_size_elements) }.map_err(|e| MahoutError::MemoryAllocation( diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs index 91966b8d2..fe7cdace0 100644 --- a/qdp/qdp-core/src/gpu/mod.rs +++ b/qdp/qdp-core/src/gpu/mod.rs @@ -16,6 +16,8 @@ pub mod memory; pub mod encodings; +pub mod pipeline; pub use memory::GpuStateVector; pub use encodings::{QuantumEncoder, AmplitudeEncoder, AngleEncoder, BasisEncoder, get_encoder}; +pub use pipeline::run_dual_stream_pipeline; diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs new file mode 100644 index 000000000..fd9a5989d --- /dev/null +++ b/qdp/qdp-core/src/gpu/pipeline.rs @@ -0,0 +1,174 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Async Pipeline Infrastructure +// +// Provides generic double-buffered execution for large data processing. +// Separates the "streaming mechanics" from the "kernel logic". + +use std::sync::Arc; +use std::ffi::c_void; +use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream}; +use crate::error::{MahoutError, Result}; + +/// Chunk processing callback for async pipeline +/// +/// This closure is called for each chunk with: +/// - `stream`: The CUDA stream to launch the kernel on +/// - `input_ptr`: Device pointer to the chunk data (already copied) +/// - `chunk_offset`: Global offset in the original data (in elements) +/// - `chunk_len`: Length of this chunk (in elements) +pub type ChunkProcessor = dyn FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>; + +/// Executes a task using dual-stream double-buffering pattern +/// +/// This function handles the generic pipeline mechanics: +/// - Dual stream creation and management +/// - Data chunking and async H2D copy +/// - Buffer lifetime management +/// - Stream synchronization +/// +/// The caller provides a `kernel_launcher` closure that handles the +/// specific kernel launch logic for each chunk. +/// +/// # Arguments +/// * `device` - The CUDA device +/// * `host_data` - Full source data to process +/// * `kernel_launcher` - Closure that launches the specific kernel for each chunk +/// +/// # Example +/// ```rust,ignore +/// run_dual_stream_pipeline(device, host_data, |stream, input_ptr, offset, len| { +/// // Launch your specific kernel here +/// launch_my_kernel(input_ptr, offset, len, stream)?; +/// Ok(()) +/// })?; +/// ``` +#[cfg(target_os = "linux")] +pub fn run_dual_stream_pipeline<F>( + device: &Arc<CudaDevice>, + host_data: &[f64], + mut kernel_launcher: F, +) -> Result<()> +where + F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>, +{ + crate::profile_scope!("GPU::AsyncPipeline"); + + // 1. Create dual streams for pipeline overlap + let stream1 = device.fork_default_stream() + .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1: {:?}", e)))?; + let stream2 = device.fork_default_stream() + .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2: {:?}", e)))?; + let streams = [&stream1, &stream2]; + + // 2. Chunk size: 8MB per chunk (balance between overhead and overlap opportunity) + // TODO: we should tune this dynamically based on the detected GPU model or PCIe bandwidth in the future. + // Too small = launch overhead dominates, too large = less overlap + const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / std::mem::size_of::<f64>(); // 8MB + + // 3. Keep temporary buffers alive until all streams complete + // This prevents Rust from dropping them while GPU is still using them + let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new(); + + let mut global_offset = 0; + + // 4. Pipeline loop: alternate between streams for maximum overlap + for (chunk_idx, chunk) in host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() { + let current_stream = streams[chunk_idx % 2]; + + crate::profile_scope!("GPU::ChunkProcess"); + + // Allocate temporary device buffer for this chunk + let input_chunk_dev = unsafe { + device.alloc::<f64>(chunk.len()) + }.map_err(|e| MahoutError::MemoryAllocation( + format!("Failed to allocate chunk buffer: {:?}", e) + ))?; + + // Async copy: host to device (non-blocking, on specified stream) + // Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy + { + crate::profile_scope!("GPU::H2DCopyAsync"); + unsafe { + unsafe extern "C" { + fn cudaMemcpyAsync( + dst: *mut c_void, + src: *const c_void, + count: usize, + kind: u32, + stream: *mut c_void, + ) -> i32; + } + + let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut c_void; + let src_host_ptr = chunk.as_ptr() as *const c_void; + let bytes = chunk.len() * std::mem::size_of::<f64>(); + let stream_handle = current_stream.stream as *mut c_void; + + // cudaMemcpyKind: cudaMemcpyHostToDevice = 1 + const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1; + + let result = cudaMemcpyAsync( + dst_device_ptr, + src_host_ptr, + bytes, + CUDA_MEMCPY_HOST_TO_DEVICE, + stream_handle, + ); + + if result != 0 { + return Err(MahoutError::Cuda( + format!("Async H2D copy failed with CUDA error: {}", result) + )); + } + } + } + + // Get device pointer for kernel launch + let input_ptr = *input_chunk_dev.device_ptr() as *const f64; + + // Invoke caller's kernel launcher (non-blocking) + { + crate::profile_scope!("GPU::KernelLaunchAsync"); + kernel_launcher(current_stream, input_ptr, global_offset, chunk.len())?; + } + + // Keep buffer alive until synchronization + // Critical: Rust will drop CudaSlice when it goes out of scope, which calls cudaFree. + // We must keep these buffers alive until all GPU work completes. + keep_alive_buffers.push(input_chunk_dev); + + // Update offset for next chunk + global_offset += chunk.len(); + } + + // 5. Synchronize all streams: wait for all work to complete + // This ensures all async copies and kernel launches have finished + { + crate::profile_scope!("GPU::StreamSync"); + device.wait_for(&stream1) + .map_err(|e| MahoutError::Cuda(format!("Stream 1 sync failed: {:?}", e)))?; + device.wait_for(&stream2) + .map_err(|e| MahoutError::Cuda(format!("Stream 2 sync failed: {:?}", e)))?; + } + + // Buffers are dropped here (after sync), freeing GPU memory + // This is safe because all GPU operations have completed + drop(keep_alive_buffers); + + Ok(()) +} diff --git a/qdp/qdp-core/tests/api_workflow.rs b/qdp/qdp-core/tests/api_workflow.rs index f88b2eb83..a1e97e31a 100644 --- a/qdp/qdp-core/tests/api_workflow.rs +++ b/qdp/qdp-core/tests/api_workflow.rs @@ -72,3 +72,38 @@ fn test_amplitude_encoding_workflow() { println!("PASS: Memory freed successfully"); } } + +#[test] +#[cfg(target_os = "linux")] +fn test_amplitude_encoding_async_pipeline() { + println!("Testing amplitude encoding async pipeline path..."); + + let engine = match QdpEngine::new(0) { + Ok(e) => e, + Err(_) => { + println!("SKIP: No GPU available"); + return; + } + }; + + // Use 200000 elements to trigger async pipeline path (ASYNC_THRESHOLD = 131072) + let data = common::create_test_data(200000); + println!("Created test data: {} elements", data.len()); + + let result = engine.encode(&data, 18, "amplitude"); + assert!(result.is_ok(), "Encoding should succeed"); + + let dlpack_ptr = result.unwrap(); + assert!(!dlpack_ptr.is_null(), "DLPack pointer should not be null"); + println!("PASS: Encoding succeeded, DLPack pointer valid"); + + unsafe { + let managed = &mut *dlpack_ptr; + assert!(managed.deleter.is_some(), "Deleter must be present"); + + println!("Calling deleter to free GPU memory"); + let deleter = managed.deleter.take().expect("Deleter function pointer is missing!"); + deleter(dlpack_ptr); + println!("PASS: Memory freed successfully"); + } +} diff --git a/qdp/qdp-kernels/build.rs b/qdp/qdp-kernels/build.rs index 2e3b01b27..c60d27c4a 100644 --- a/qdp/qdp-kernels/build.rs +++ b/qdp/qdp-kernels/build.rs @@ -81,6 +81,4 @@ fn main() { // .flag("arch=compute_89,code=sm_89") .file("src/amplitude.cu") .compile("kernels"); - - println!("cargo:warning=CUDA kernels compiled successfully"); } diff --git a/qdp/qdp-kernels/src/amplitude.cu b/qdp/qdp-kernels/src/amplitude.cu index 9e4537a71..c652cc70d 100644 --- a/qdp/qdp-kernels/src/amplitude.cu +++ b/qdp/qdp-kernels/src/amplitude.cu @@ -39,8 +39,8 @@ extern "C" { int launch_amplitude_encode( const double* input_d, void* state_d, - int input_len, - int state_len, + size_t input_len, + size_t state_len, double norm, cudaStream_t stream ) { diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs index 95970945f..a59733fb8 100644 --- a/qdp/qdp-kernels/src/lib.rs +++ b/qdp/qdp-kernels/src/lib.rs @@ -47,8 +47,8 @@ unsafe extern "C" { pub fn launch_amplitude_encode( input_d: *const f64, state_d: *mut c_void, - input_len: i32, - state_len: i32, + input_len: usize, + state_len: usize, norm: f64, stream: *mut c_void, ) -> i32; @@ -62,8 +62,8 @@ unsafe extern "C" { pub extern "C" fn launch_amplitude_encode( _input_d: *const f64, _state_d: *mut c_void, - _input_len: i32, - _state_len: i32, + _input_len: usize, + _state_len: usize, _norm: f64, _stream: *mut c_void, ) -> i32 { diff --git a/qdp/qdp-python/pyproject.toml b/qdp/qdp-python/pyproject.toml index b109b30fc..b4e262dd8 100644 --- a/qdp/qdp-python/pyproject.toml +++ b/qdp/qdp-python/pyproject.toml @@ -17,7 +17,7 @@ dev = [ "maturin>=1.10.2", "patchelf>=0.17.2.4", "pytest>=9.0.1", - "torch>=2.2,<2.3", + "torch>=2.2", ] [[tool.uv.index]] diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs index 8d65f3467..18484cf15 100644 --- a/qdp/qdp-python/src/lib.rs +++ b/qdp/qdp-python/src/lib.rs @@ -105,7 +105,13 @@ impl Drop for QuantumTensor { // If consumed, PyTorch/consumer will call the deleter if !self.consumed && !self.ptr.is_null() { unsafe { - // Call the DLPack deleter to properly free memory + // Defensive check: qdp-core always provides a deleter + debug_assert!( + (*self.ptr).deleter.is_some(), + "DLManagedTensor from qdp-core should always have a deleter" + ); + + // Call the DLPack deleter to free memory if let Some(deleter) = (*self.ptr).deleter { deleter(self.ptr); } @@ -165,8 +171,7 @@ impl QdpEngine { /// >>> qtensor = engine.encode([1.0, 2.0, 3.0, 4.0], num_qubits=2, encoding_method="amplitude") /// >>> torch_tensor = torch.from_dlpack(qtensor) /// - /// TODO: Replace Vec<f64> with numpy array input to enable zero-copy reading. - /// Consider using the numpy crate (e.g., PyReadonlyArray1<f64>) for better performance. + /// TODO: Use numpy array input (`PyReadonlyArray1<f64>`) for zero-copy instead of `Vec<f64>`. fn encode(&self, data: Vec<f64>, num_qubits: usize, encoding_method: &str) -> PyResult<QuantumTensor> { let ptr = self.engine.encode(&data, num_qubits, encoding_method) .map_err(|e| PyRuntimeError::new_err(format!("Encoding failed: {}", e)))?; diff --git a/qdp/qdp-python/uv.lock b/qdp/qdp-python/uv.lock index 07b338f5e..baa699256 100644 --- a/qdp/qdp-python/uv.lock +++ b/qdp/qdp-python/uv.lock @@ -348,7 +348,7 @@ dev = [ { name = "maturin", specifier = ">=1.10.2" }, { name = "patchelf", specifier = ">=0.17.2.4" }, { name = "pytest", specifier = ">=9.0.1" }, - { name = "torch", specifier = ">=2.2,<2.3" }, + { name = "torch", specifier = ">=2.2" }, ] [[package]]
