This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git

commit 4fb7065f05bad8d13dfa0afae7560e44fba61a2b
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Thu Dec 4 15:58:03 2025 +0800

    [QDP] make dataflow async and improve amplitudeEncoder  (#675)
    
    * async pipline
    
    * prevent error
    
    * Add TODO for dynamic chunk size tuning
    
    Added a TODO comment to tune chunk size dynamically based on GPU model or 
PCIe bandwidth.
    
    * update
    
    * add a test
---
 qdp/qdp-core/src/gpu/encodings/amplitude.rs | 122 ++++++++++++++++++-
 qdp/qdp-core/src/gpu/memory.rs              |   2 +
 qdp/qdp-core/src/gpu/mod.rs                 |   2 +
 qdp/qdp-core/src/gpu/pipeline.rs            | 174 ++++++++++++++++++++++++++++
 qdp/qdp-core/tests/api_workflow.rs          |  35 ++++++
 qdp/qdp-kernels/build.rs                    |   2 -
 qdp/qdp-kernels/src/amplitude.cu            |   4 +-
 qdp/qdp-kernels/src/lib.rs                  |   8 +-
 qdp/qdp-python/pyproject.toml               |   2 +-
 qdp/qdp-python/src/lib.rs                   |  11 +-
 qdp/qdp-python/uv.lock                      |   2 +-
 11 files changed, 345 insertions(+), 19 deletions(-)

diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs 
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 1709eab77..38551d15c 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
 use cudarc::driver::CudaDevice;
 use crate::error::{MahoutError, Result};
 use crate::gpu::memory::GpuStateVector;
+use crate::gpu::pipeline::run_dual_stream_pipeline;
 use super::QuantumEncoder;
 
 #[cfg(target_os = "linux")]
@@ -57,23 +58,27 @@ impl QuantumEncoder for AmplitudeEncoder {
                 GpuStateVector::new(_device, num_qubits)?
             };
 
-            // Copy input data to GPU (synchronous, zero-copy from slice)
-            // TODO : Use async CUDA streams for pipeline overlap
+            // SSS-Tier Optimization: Async Pipeline for large data
+            // For small data (< 1MB), use synchronous path to avoid stream 
overhead
+            // For large data, use dual-stream async pipeline for maximum 
throughput
+            const ASYNC_THRESHOLD: usize = 1024 * 1024 / 
std::mem::size_of::<f64>(); // 1MB threshold
+
+            if host_data.len() < ASYNC_THRESHOLD {
+                // Synchronous path for small data (avoids stream overhead)
             let input_slice = {
                 crate::profile_scope!("GPU::H2DCopy");
                 _device.htod_sync_copy(host_data)
                     .map_err(|e| MahoutError::MemoryAllocation(format!("Failed 
to allocate input buffer: {:?}", e)))?
             };
 
-            // Launch CUDA kernel (CPU-side launch only; execution is 
asynchronous)
             let ret = {
                 crate::profile_scope!("GPU::KernelLaunch");
                 unsafe {
                     launch_amplitude_encode(
                         *input_slice.device_ptr() as *const f64,
                         state_vector.ptr() as *mut c_void,
-                        host_data.len() as i32,
-                        state_len as i32,
+                            host_data.len(),
+                            state_len,
                         norm,
                         std::ptr::null_mut(), // default stream
                     )
@@ -89,12 +94,15 @@ impl QuantumEncoder for AmplitudeEncoder {
                 return Err(MahoutError::KernelLaunch(error_msg));
             }
 
-            // Block until all work on the device is complete
             {
                 crate::profile_scope!("GPU::Synchronize");
                 _device
                     .synchronize()
                     .map_err(|e| MahoutError::Cuda(format!("CUDA device 
synchronize failed: {:?}", e)))?;
+                }
+            } else {
+                // Async Pipeline path for large data
+                Self::encode_async_pipeline(_device, host_data, num_qubits, 
state_len, norm, &state_vector)?;
             }
 
             Ok(state_vector)
@@ -115,6 +123,108 @@ impl QuantumEncoder for AmplitudeEncoder {
     }
 }
 
+impl AmplitudeEncoder {
+    /// Async pipeline encoding for large data (SSS-tier optimization)
+    ///
+    /// Uses the generic dual-stream pipeline infrastructure to overlap
+    /// data transfer and computation. The pipeline handles all the
+    /// streaming mechanics, while this method focuses on the amplitude
+    /// encoding kernel logic.
+    #[cfg(target_os = "linux")]
+    fn encode_async_pipeline(
+        device: &Arc<CudaDevice>,
+        host_data: &[f64],
+        _num_qubits: usize,
+        state_len: usize,
+        norm: f64,
+        state_vector: &GpuStateVector,
+    ) -> Result<()> {
+        // Use generic pipeline infrastructure
+        // The closure handles amplitude-specific kernel launch logic
+        run_dual_stream_pipeline(device, host_data, |stream, input_ptr, 
chunk_offset, chunk_len| {
+            // Calculate offset pointer for state vector (type-safe pointer 
arithmetic)
+            // Offset is in complex numbers (CuDoubleComplex), not f64 elements
+            let state_ptr_offset = unsafe {
+                state_vector.ptr().cast::<u8>()
+                    .add(chunk_offset * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
+                    .cast::<std::ffi::c_void>()
+            };
+
+            // Launch amplitude encoding kernel on the provided stream
+            let ret = unsafe {
+                launch_amplitude_encode(
+                    input_ptr,
+                    state_ptr_offset,
+                    chunk_len,
+                    state_len,
+                    norm,
+                    stream.stream as *mut c_void,
+                )
+            };
+
+            if ret != 0 {
+                let error_msg = format!(
+                    "Kernel launch failed with CUDA error code: {} ({})",
+                    ret,
+                    cuda_error_to_string(ret)
+                );
+                return Err(MahoutError::KernelLaunch(error_msg));
+            }
+
+            Ok(())
+        })?;
+
+        // CRITICAL FIX: Handle padding for uninitialized memory
+        // Since we use alloc() (uninitialized), we must zero-fill any tail 
region
+        // that wasn't written by the pipeline. This ensures correctness when
+        // host_data.len() < state_len (e.g., 1000 elements in a 1024-element 
state).
+        let data_len = host_data.len();
+        if data_len < state_len {
+            let padding_start = data_len;
+            let padding_elements = state_len - padding_start;
+            let padding_bytes = padding_elements * 
std::mem::size_of::<qdp_kernels::CuDoubleComplex>();
+
+            // Calculate tail pointer (in complex numbers)
+            let tail_ptr = unsafe {
+                state_vector.ptr().add(padding_start) as *mut c_void
+            };
+
+            // Zero-fill padding region using CUDA Runtime API
+            // Use default stream since pipeline streams are already 
synchronized
+            unsafe {
+                unsafe extern "C" {
+                    fn cudaMemsetAsync(
+                        devPtr: *mut c_void,
+                        value: i32,
+                        count: usize,
+                        stream: *mut c_void,
+                    ) -> i32;
+                }
+
+                let result = cudaMemsetAsync(
+                    tail_ptr,
+                    0,
+                    padding_bytes,
+                    std::ptr::null_mut(), // default stream
+                );
+
+                if result != 0 {
+                    return Err(MahoutError::Cuda(
+                        format!("Failed to zero-fill padding region: {} ({})",
+                                result, cuda_error_to_string(result))
+                    ));
+                }
+            }
+
+            // Synchronize to ensure padding is complete before returning
+            device.synchronize()
+                .map_err(|e| MahoutError::Cuda(format!("Failed to sync after 
padding: {:?}", e)))?;
+        }
+
+        Ok(())
+    }
+}
+
 /// Convert CUDA error code to human-readable string
 #[cfg(target_os = "linux")]
 fn cuda_error_to_string(code: i32) -> &'static str {
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 49f26602a..513c326c0 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -60,6 +60,8 @@ impl GpuStateVector {
         #[cfg(target_os = "linux")]
         {
             // Use uninitialized allocation to avoid memory bandwidth waste.
+            // TODO: Consider using a memory pool for input buffers to avoid 
repeated
+            // cudaMalloc overhead in high-frequency encode() calls.
             let slice = unsafe {
                 _device.alloc::<CuDoubleComplex>(_size_elements)
             }.map_err(|e| MahoutError::MemoryAllocation(
diff --git a/qdp/qdp-core/src/gpu/mod.rs b/qdp/qdp-core/src/gpu/mod.rs
index 91966b8d2..fe7cdace0 100644
--- a/qdp/qdp-core/src/gpu/mod.rs
+++ b/qdp/qdp-core/src/gpu/mod.rs
@@ -16,6 +16,8 @@
 
 pub mod memory;
 pub mod encodings;
+pub mod pipeline;
 
 pub use memory::GpuStateVector;
 pub use encodings::{QuantumEncoder, AmplitudeEncoder, AngleEncoder, 
BasisEncoder, get_encoder};
+pub use pipeline::run_dual_stream_pipeline;
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
new file mode 100644
index 000000000..fd9a5989d
--- /dev/null
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -0,0 +1,174 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Async Pipeline Infrastructure
+//
+// Provides generic double-buffered execution for large data processing.
+// Separates the "streaming mechanics" from the "kernel logic".
+
+use std::sync::Arc;
+use std::ffi::c_void;
+use cudarc::driver::{CudaDevice, CudaSlice, DevicePtr, safe::CudaStream};
+use crate::error::{MahoutError, Result};
+
+/// Chunk processing callback for async pipeline
+///
+/// This closure is called for each chunk with:
+/// - `stream`: The CUDA stream to launch the kernel on
+/// - `input_ptr`: Device pointer to the chunk data (already copied)
+/// - `chunk_offset`: Global offset in the original data (in elements)
+/// - `chunk_len`: Length of this chunk (in elements)
+pub type ChunkProcessor = dyn FnMut(&CudaStream, *const f64, usize, usize) -> 
Result<()>;
+
+/// Executes a task using dual-stream double-buffering pattern
+///
+/// This function handles the generic pipeline mechanics:
+/// - Dual stream creation and management
+/// - Data chunking and async H2D copy
+/// - Buffer lifetime management
+/// - Stream synchronization
+///
+/// The caller provides a `kernel_launcher` closure that handles the
+/// specific kernel launch logic for each chunk.
+///
+/// # Arguments
+/// * `device` - The CUDA device
+/// * `host_data` - Full source data to process
+/// * `kernel_launcher` - Closure that launches the specific kernel for each 
chunk
+///
+/// # Example
+/// ```rust,ignore
+/// run_dual_stream_pipeline(device, host_data, |stream, input_ptr, offset, 
len| {
+///     // Launch your specific kernel here
+///     launch_my_kernel(input_ptr, offset, len, stream)?;
+///     Ok(())
+/// })?;
+/// ```
+#[cfg(target_os = "linux")]
+pub fn run_dual_stream_pipeline<F>(
+    device: &Arc<CudaDevice>,
+    host_data: &[f64],
+    mut kernel_launcher: F,
+) -> Result<()>
+where
+    F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+    crate::profile_scope!("GPU::AsyncPipeline");
+
+    // 1. Create dual streams for pipeline overlap
+    let stream1 = device.fork_default_stream()
+        .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 1: 
{:?}", e)))?;
+    let stream2 = device.fork_default_stream()
+        .map_err(|e| MahoutError::Cuda(format!("Failed to create stream 2: 
{:?}", e)))?;
+    let streams = [&stream1, &stream2];
+
+    // 2. Chunk size: 8MB per chunk (balance between overhead and overlap 
opportunity)
+    // TODO: we should tune this dynamically based on the detected GPU model 
or PCIe bandwidth in the future.
+    // Too small = launch overhead dominates, too large = less overlap
+    const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / 
std::mem::size_of::<f64>(); // 8MB
+
+    // 3. Keep temporary buffers alive until all streams complete
+    // This prevents Rust from dropping them while GPU is still using them
+    let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
+
+    let mut global_offset = 0;
+
+    // 4. Pipeline loop: alternate between streams for maximum overlap
+    for (chunk_idx, chunk) in 
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
+        let current_stream = streams[chunk_idx % 2];
+
+        crate::profile_scope!("GPU::ChunkProcess");
+
+        // Allocate temporary device buffer for this chunk
+        let input_chunk_dev = unsafe {
+            device.alloc::<f64>(chunk.len())
+        }.map_err(|e| MahoutError::MemoryAllocation(
+            format!("Failed to allocate chunk buffer: {:?}", e)
+        ))?;
+
+        // Async copy: host to device (non-blocking, on specified stream)
+        // Uses CUDA Runtime API (cudaMemcpyAsync) for true async copy
+        {
+            crate::profile_scope!("GPU::H2DCopyAsync");
+            unsafe {
+                unsafe extern "C" {
+                    fn cudaMemcpyAsync(
+                        dst: *mut c_void,
+                        src: *const c_void,
+                        count: usize,
+                        kind: u32,
+                        stream: *mut c_void,
+                    ) -> i32;
+                }
+
+                let dst_device_ptr = *input_chunk_dev.device_ptr() as *mut 
c_void;
+                let src_host_ptr = chunk.as_ptr() as *const c_void;
+                let bytes = chunk.len() * std::mem::size_of::<f64>();
+                let stream_handle = current_stream.stream as *mut c_void;
+
+                // cudaMemcpyKind: cudaMemcpyHostToDevice = 1
+                const CUDA_MEMCPY_HOST_TO_DEVICE: u32 = 1;
+
+                let result = cudaMemcpyAsync(
+                    dst_device_ptr,
+                    src_host_ptr,
+                    bytes,
+                    CUDA_MEMCPY_HOST_TO_DEVICE,
+                    stream_handle,
+                );
+
+                if result != 0 {
+                    return Err(MahoutError::Cuda(
+                        format!("Async H2D copy failed with CUDA error: {}", 
result)
+                    ));
+                }
+            }
+        }
+
+        // Get device pointer for kernel launch
+        let input_ptr = *input_chunk_dev.device_ptr() as *const f64;
+
+        // Invoke caller's kernel launcher (non-blocking)
+        {
+            crate::profile_scope!("GPU::KernelLaunchAsync");
+            kernel_launcher(current_stream, input_ptr, global_offset, 
chunk.len())?;
+        }
+
+        // Keep buffer alive until synchronization
+        // Critical: Rust will drop CudaSlice when it goes out of scope, which 
calls cudaFree.
+        // We must keep these buffers alive until all GPU work completes.
+        keep_alive_buffers.push(input_chunk_dev);
+
+        // Update offset for next chunk
+        global_offset += chunk.len();
+    }
+
+    // 5. Synchronize all streams: wait for all work to complete
+    // This ensures all async copies and kernel launches have finished
+    {
+        crate::profile_scope!("GPU::StreamSync");
+        device.wait_for(&stream1)
+            .map_err(|e| MahoutError::Cuda(format!("Stream 1 sync failed: 
{:?}", e)))?;
+        device.wait_for(&stream2)
+            .map_err(|e| MahoutError::Cuda(format!("Stream 2 sync failed: 
{:?}", e)))?;
+    }
+
+    // Buffers are dropped here (after sync), freeing GPU memory
+    // This is safe because all GPU operations have completed
+    drop(keep_alive_buffers);
+
+    Ok(())
+}
diff --git a/qdp/qdp-core/tests/api_workflow.rs 
b/qdp/qdp-core/tests/api_workflow.rs
index f88b2eb83..a1e97e31a 100644
--- a/qdp/qdp-core/tests/api_workflow.rs
+++ b/qdp/qdp-core/tests/api_workflow.rs
@@ -72,3 +72,38 @@ fn test_amplitude_encoding_workflow() {
         println!("PASS: Memory freed successfully");
     }
 }
+
+#[test]
+#[cfg(target_os = "linux")]
+fn test_amplitude_encoding_async_pipeline() {
+    println!("Testing amplitude encoding async pipeline path...");
+
+    let engine = match QdpEngine::new(0) {
+        Ok(e) => e,
+        Err(_) => {
+            println!("SKIP: No GPU available");
+            return;
+        }
+    };
+
+    // Use 200000 elements to trigger async pipeline path (ASYNC_THRESHOLD = 
131072)
+    let data = common::create_test_data(200000);
+    println!("Created test data: {} elements", data.len());
+
+    let result = engine.encode(&data, 18, "amplitude");
+    assert!(result.is_ok(), "Encoding should succeed");
+
+    let dlpack_ptr = result.unwrap();
+    assert!(!dlpack_ptr.is_null(), "DLPack pointer should not be null");
+    println!("PASS: Encoding succeeded, DLPack pointer valid");
+
+    unsafe {
+        let managed = &mut *dlpack_ptr;
+        assert!(managed.deleter.is_some(), "Deleter must be present");
+
+        println!("Calling deleter to free GPU memory");
+        let deleter = managed.deleter.take().expect("Deleter function pointer 
is missing!");
+        deleter(dlpack_ptr);
+        println!("PASS: Memory freed successfully");
+    }
+}
diff --git a/qdp/qdp-kernels/build.rs b/qdp/qdp-kernels/build.rs
index 2e3b01b27..c60d27c4a 100644
--- a/qdp/qdp-kernels/build.rs
+++ b/qdp/qdp-kernels/build.rs
@@ -81,6 +81,4 @@ fn main() {
         // .flag("arch=compute_89,code=sm_89")
         .file("src/amplitude.cu")
         .compile("kernels");
-
-    println!("cargo:warning=CUDA kernels compiled successfully");
 }
diff --git a/qdp/qdp-kernels/src/amplitude.cu b/qdp/qdp-kernels/src/amplitude.cu
index 9e4537a71..c652cc70d 100644
--- a/qdp/qdp-kernels/src/amplitude.cu
+++ b/qdp/qdp-kernels/src/amplitude.cu
@@ -39,8 +39,8 @@ extern "C" {
 int launch_amplitude_encode(
     const double* input_d,
     void* state_d,
-    int input_len,
-    int state_len,
+    size_t input_len,
+    size_t state_len,
     double norm,
     cudaStream_t stream
 ) {
diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs
index 95970945f..a59733fb8 100644
--- a/qdp/qdp-kernels/src/lib.rs
+++ b/qdp/qdp-kernels/src/lib.rs
@@ -47,8 +47,8 @@ unsafe extern "C" {
     pub fn launch_amplitude_encode(
         input_d: *const f64,
         state_d: *mut c_void,
-        input_len: i32,
-        state_len: i32,
+        input_len: usize,
+        state_len: usize,
         norm: f64,
         stream: *mut c_void,
     ) -> i32;
@@ -62,8 +62,8 @@ unsafe extern "C" {
 pub extern "C" fn launch_amplitude_encode(
     _input_d: *const f64,
     _state_d: *mut c_void,
-    _input_len: i32,
-    _state_len: i32,
+    _input_len: usize,
+    _state_len: usize,
     _norm: f64,
     _stream: *mut c_void,
 ) -> i32 {
diff --git a/qdp/qdp-python/pyproject.toml b/qdp/qdp-python/pyproject.toml
index b109b30fc..b4e262dd8 100644
--- a/qdp/qdp-python/pyproject.toml
+++ b/qdp/qdp-python/pyproject.toml
@@ -17,7 +17,7 @@ dev = [
     "maturin>=1.10.2",
     "patchelf>=0.17.2.4",
     "pytest>=9.0.1",
-    "torch>=2.2,<2.3",
+    "torch>=2.2",
 ]
 
 [[tool.uv.index]]
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 8d65f3467..18484cf15 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -105,7 +105,13 @@ impl Drop for QuantumTensor {
         // If consumed, PyTorch/consumer will call the deleter
         if !self.consumed && !self.ptr.is_null() {
             unsafe {
-                // Call the DLPack deleter to properly free memory
+                // Defensive check: qdp-core always provides a deleter
+                debug_assert!(
+                    (*self.ptr).deleter.is_some(),
+                    "DLManagedTensor from qdp-core should always have a 
deleter"
+                );
+
+                // Call the DLPack deleter to free memory
                 if let Some(deleter) = (*self.ptr).deleter {
                     deleter(self.ptr);
                 }
@@ -165,8 +171,7 @@ impl QdpEngine {
     ///     >>> qtensor = engine.encode([1.0, 2.0, 3.0, 4.0], num_qubits=2, 
encoding_method="amplitude")
     ///     >>> torch_tensor = torch.from_dlpack(qtensor)
     ///
-    /// TODO: Replace Vec<f64> with numpy array input to enable zero-copy 
reading.
-    /// Consider using the numpy crate (e.g., PyReadonlyArray1<f64>) for 
better performance.
+    /// TODO: Use numpy array input (`PyReadonlyArray1<f64>`) for zero-copy 
instead of `Vec<f64>`.
     fn encode(&self, data: Vec<f64>, num_qubits: usize, encoding_method: &str) 
-> PyResult<QuantumTensor> {
         let ptr = self.engine.encode(&data, num_qubits, encoding_method)
             .map_err(|e| PyRuntimeError::new_err(format!("Encoding failed: 
{}", e)))?;
diff --git a/qdp/qdp-python/uv.lock b/qdp/qdp-python/uv.lock
index 07b338f5e..baa699256 100644
--- a/qdp/qdp-python/uv.lock
+++ b/qdp/qdp-python/uv.lock
@@ -348,7 +348,7 @@ dev = [
     { name = "maturin", specifier = ">=1.10.2" },
     { name = "patchelf", specifier = ">=0.17.2.4" },
     { name = "pytest", specifier = ">=9.0.1" },
-    { name = "torch", specifier = ">=2.2,<2.3" },
+    { name = "torch", specifier = ">=2.2" },
 ]
 
 [[package]]

Reply via email to