This is an automated email from the ASF dual-hosted git repository.

jiekaichang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git


The following commit(s) were added to refs/heads/main by this push:
     new e686a91cb [QDP] Add Angle Encoding Async Pipeline for Large Batch 
Uploads (#928)
e686a91cb is described below

commit e686a91cbffd8ccdd0ec4f9f1d515dc1f7acb9e4
Author: Jie-Kai Chang <[email protected]>
AuthorDate: Tue Jan 27 19:50:38 2026 +0800

    [QDP] Add Angle Encoding Async Pipeline for Large Batch Uploads (#928)
    
    * angle async pipeline
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix pre-commit
    
    Signed-off-by: 400Ping <[email protected]>
    
    * update
    
    Signed-off-by: 400Ping <[email protected]>
    
    * fix pre-commit
    
    Signed-off-by: 400Ping <[email protected]>
    
    ---------
    
    Signed-off-by: 400Ping <[email protected]>
---
 qdp/qdp-core/src/gpu/encodings/angle.rs | 80 ++++++++++++++++++++++++++++++
 qdp/qdp-core/src/gpu/pipeline.rs        | 87 +++++++++++++++++++++++++++++----
 qdp/qdp-core/tests/api_workflow.rs      | 74 ++++++++++++++++++++++++++++
 3 files changed, 232 insertions(+), 9 deletions(-)

diff --git a/qdp/qdp-core/src/gpu/encodings/angle.rs 
b/qdp/qdp-core/src/gpu/encodings/angle.rs
index 769a7e96d..353a9f4c4 100644
--- a/qdp/qdp-core/src/gpu/encodings/angle.rs
+++ b/qdp/qdp-core/src/gpu/encodings/angle.rs
@@ -25,6 +25,8 @@ use super::{QuantumEncoder, validate_qubit_count};
 use crate::error::cuda_error_to_string;
 use crate::error::{MahoutError, Result};
 use crate::gpu::memory::GpuStateVector;
+#[cfg(target_os = "linux")]
+use crate::gpu::pipeline::run_dual_stream_pipeline_aligned;
 use cudarc::driver::CudaDevice;
 use std::sync::Arc;
 
@@ -152,6 +154,18 @@ impl QuantumEncoder for AngleEncoder {
 
         let state_len = 1 << num_qubits;
 
+        const ASYNC_THRESHOLD_ELEMENTS: usize = 1024 * 1024 / 
std::mem::size_of::<f64>(); // 1MB
+        if batch_data.len() >= ASYNC_THRESHOLD_ELEMENTS {
+            return Self::encode_batch_async_pipeline(
+                device,
+                batch_data,
+                num_samples,
+                sample_size,
+                num_qubits,
+                state_len,
+            );
+        }
+
         let batch_state_vector = {
             crate::profile_scope!("GPU::AllocBatch");
             GpuStateVector::new_batch(device, num_samples, num_qubits)?
@@ -231,3 +245,69 @@ impl QuantumEncoder for AngleEncoder {
         "Angle encoding: per-qubit rotations into a product state"
     }
 }
+
+impl AngleEncoder {
+    #[cfg(target_os = "linux")]
+    fn encode_batch_async_pipeline(
+        device: &Arc<CudaDevice>,
+        batch_data: &[f64],
+        num_samples: usize,
+        sample_size: usize,
+        num_qubits: usize,
+        state_len: usize,
+    ) -> Result<GpuStateVector> {
+        let batch_state_vector = {
+            crate::profile_scope!("GPU::AllocBatch");
+            GpuStateVector::new_batch(device, num_samples, num_qubits)?
+        };
+
+        let state_ptr = batch_state_vector.ptr_f64().ok_or_else(|| {
+            MahoutError::InvalidInput(
+                "Batch state vector precision mismatch (expected float64 
buffer)".to_string(),
+            )
+        })?;
+
+        run_dual_stream_pipeline_aligned(
+            device,
+            batch_data,
+            sample_size,
+            |stream, input_ptr, chunk_offset, chunk_len| {
+                if chunk_len % sample_size != 0 || chunk_offset % sample_size 
!= 0 {
+                    return Err(MahoutError::InvalidInput(
+                        "Angle batch chunk is not aligned to sample 
size".to_string(),
+                    ));
+                }
+
+                let chunk_samples = chunk_len / sample_size;
+                let sample_offset = chunk_offset / sample_size;
+                let offset_elements = 
sample_offset.checked_mul(state_len).ok_or_else(|| {
+                    MahoutError::InvalidInput("Angle batch output offset 
overflow".to_string())
+                })?;
+
+                let state_ptr_offset = unsafe { state_ptr.add(offset_elements) 
as *mut c_void };
+                let ret = unsafe {
+                    qdp_kernels::launch_angle_encode_batch(
+                        input_ptr,
+                        state_ptr_offset,
+                        chunk_samples,
+                        state_len,
+                        num_qubits as u32,
+                        stream.stream as *mut c_void,
+                    )
+                };
+
+                if ret != 0 {
+                    return Err(MahoutError::KernelLaunch(format!(
+                        "Batch angle encoding kernel failed: {} ({})",
+                        ret,
+                        cuda_error_to_string(ret)
+                    )));
+                }
+
+                Ok(())
+            },
+        )?;
+
+        Ok(batch_state_vector)
+    }
+}
diff --git a/qdp/qdp-core/src/gpu/pipeline.rs b/qdp/qdp-core/src/gpu/pipeline.rs
index 26874ab3b..75db1f1d2 100644
--- a/qdp/qdp-core/src/gpu/pipeline.rs
+++ b/qdp/qdp-core/src/gpu/pipeline.rs
@@ -241,34 +241,102 @@ impl Drop for PipelineContext {
 pub fn run_dual_stream_pipeline<F>(
     device: &Arc<CudaDevice>,
     host_data: &[f64],
-    mut kernel_launcher: F,
+    kernel_launcher: F,
 ) -> Result<()>
 where
     F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
 {
     crate::profile_scope!("GPU::AsyncPipeline");
 
-    // Pinned host staging pool sized to the current chunking strategy 
(double-buffer by default).
     const CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / 
std::mem::size_of::<f64>(); // 8MB
+    run_dual_stream_pipeline_with_chunk_size(
+        device,
+        host_data,
+        CHUNK_SIZE_ELEMENTS,
+        kernel_launcher,
+    )
+}
+
+/// Executes a task using dual-stream double-buffering with aligned chunk 
boundaries.
+///
+/// `align_elements` must evenly divide the host data length and ensures 
chunks do not
+/// split logical records (e.g., per-sample data in batch encoding).
+#[cfg(target_os = "linux")]
+pub fn run_dual_stream_pipeline_aligned<F>(
+    device: &Arc<CudaDevice>,
+    host_data: &[f64],
+    align_elements: usize,
+    kernel_launcher: F,
+) -> Result<()>
+where
+    F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+    crate::profile_scope!("GPU::AsyncPipelineAligned");
+
+    if align_elements == 0 {
+        return Err(MahoutError::InvalidInput(
+            "Alignment must be greater than zero".to_string(),
+        ));
+    }
+    if !host_data.len().is_multiple_of(align_elements) {
+        return Err(MahoutError::InvalidInput(format!(
+            "Host data length {} is not aligned to {} elements",
+            host_data.len(),
+            align_elements
+        )));
+    }
+
+    const BASE_CHUNK_SIZE_ELEMENTS: usize = 8 * 1024 * 1024 / 
std::mem::size_of::<f64>(); // 8MB
+    let chunk_size_elements = if align_elements >= BASE_CHUNK_SIZE_ELEMENTS {
+        align_elements
+    } else {
+        BASE_CHUNK_SIZE_ELEMENTS - (BASE_CHUNK_SIZE_ELEMENTS % align_elements)
+    };
+
+    run_dual_stream_pipeline_with_chunk_size(
+        device,
+        host_data,
+        chunk_size_elements,
+        kernel_launcher,
+    )
+}
+
+#[cfg(target_os = "linux")]
+fn run_dual_stream_pipeline_with_chunk_size<F>(
+    device: &Arc<CudaDevice>,
+    host_data: &[f64],
+    chunk_size_elements: usize,
+    mut kernel_launcher: F,
+) -> Result<()>
+where
+    F: FnMut(&CudaStream, *const f64, usize, usize) -> Result<()>,
+{
+    if chunk_size_elements == 0 {
+        return Err(MahoutError::InvalidInput(
+            "Chunk size must be greater than zero".to_string(),
+        ));
+    }
+
+    // Pinned host staging pool sized to the current chunking strategy 
(double-buffer by default).
     const PINNED_POOL_SIZE: usize = 2; // double buffering
     // 1. Create dual streams with per-slot events to coordinate copy -> 
compute
     let ctx = PipelineContext::new(device, PINNED_POOL_SIZE)?;
-    let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE, 
CHUNK_SIZE_ELEMENTS)
+    let pinned_pool = PinnedBufferPool::new(PINNED_POOL_SIZE, 
chunk_size_elements)
         .map_err(|e| MahoutError::Cuda(format!("Failed to create pinned buffer 
pool: {}", e)))?;
 
-    // 2. Chunk size: 8MB per chunk (balance between overhead and overlap 
opportunity)
-    // TODO: tune dynamically based on GPU/PCIe bandwidth.
-
     // 3. Keep temporary buffers alive until all streams complete
     // This prevents Rust from dropping them while GPU is still using them
     let mut keep_alive_buffers: Vec<CudaSlice<f64>> = Vec::new();
     // Keep pinned buffers alive until the copy stream has completed their H2D 
copy
     let mut in_flight_pinned: Vec<PinnedBufferHandle> = Vec::new();
 
-    let mut global_offset = 0;
-
     // 4. Pipeline loop: copy on copy stream, compute on compute stream with 
event handoff
-    for (chunk_idx, chunk) in 
host_data.chunks(CHUNK_SIZE_ELEMENTS).enumerate() {
+    let mut chunk_idx = 0usize;
+    let mut global_offset = 0usize;
+    while global_offset < host_data.len() {
+        let remaining = host_data.len() - global_offset;
+        let chunk_len = remaining.min(chunk_size_elements);
+        let chunk = &host_data[global_offset..global_offset + chunk_len];
         let chunk_offset = global_offset;
         let event_slot = chunk_idx % PINNED_POOL_SIZE;
 
@@ -330,6 +398,7 @@ where
 
         // Update offset for next chunk
         global_offset += chunk.len();
+        chunk_idx += 1;
     }
 
     // 5. Synchronize all streams: wait for all work to complete
diff --git a/qdp/qdp-core/tests/api_workflow.rs 
b/qdp/qdp-core/tests/api_workflow.rs
index ff7f0d77c..bc94d4e34 100644
--- a/qdp/qdp-core/tests/api_workflow.rs
+++ b/qdp/qdp-core/tests/api_workflow.rs
@@ -16,7 +16,13 @@
 
 // API workflow tests: Engine initialization and encoding
 
+#[cfg(target_os = "linux")]
+use cudarc::driver::CudaDevice;
+#[cfg(target_os = "linux")]
+use qdp_core::MahoutError;
 use qdp_core::QdpEngine;
+#[cfg(target_os = "linux")]
+use qdp_core::gpu::pipeline::run_dual_stream_pipeline_aligned;
 
 mod common;
 
@@ -113,6 +119,74 @@ fn test_amplitude_encoding_async_pipeline() {
     }
 }
 
+#[test]
+#[cfg(target_os = "linux")]
+fn test_angle_encoding_async_pipeline() {
+    println!("Testing angle encoding async pipeline path...");
+
+    let engine = match QdpEngine::new(0) {
+        Ok(e) => e,
+        Err(_) => {
+            println!("SKIP: No GPU available");
+            return;
+        }
+    };
+
+    let num_qubits = 4;
+    let sample_size = num_qubits;
+    let num_samples = 32768; // 32768 * 4 = 131072 elements (>= 1MB threshold)
+    let batch_data = common::create_test_data(num_samples * sample_size);
+
+    let result = engine.encode_batch(&batch_data, num_samples, sample_size, 
num_qubits, "angle");
+    let dlpack_ptr = result.expect("Angle batch encoding should succeed");
+    assert!(!dlpack_ptr.is_null(), "DLPack pointer should not be null");
+    println!("PASS: Angle batch encoding succeeded, DLPack pointer valid");
+
+    unsafe {
+        let managed = &mut *dlpack_ptr;
+        assert!(managed.deleter.is_some(), "Deleter must be present");
+
+        println!("Calling deleter to free GPU memory");
+        let deleter = managed
+            .deleter
+            .take()
+            .expect("Deleter function pointer is missing!");
+        deleter(dlpack_ptr);
+        println!("PASS: Memory freed successfully");
+    }
+}
+
+#[test]
+#[cfg(target_os = "linux")]
+fn test_angle_async_alignment_error() {
+    println!("Testing angle async pipeline alignment error...");
+
+    let device = match CudaDevice::new(0) {
+        Ok(d) => d,
+        Err(_) => {
+            println!("SKIP: No GPU available");
+            return;
+        }
+    };
+
+    let misaligned_data = vec![0.0_f64; 10];
+    let result =
+        run_dual_stream_pipeline_aligned(&device, &misaligned_data, 4, |_, _, 
_, _| Ok(()));
+
+    match result {
+        Err(MahoutError::InvalidInput(msg)) => {
+            assert!(
+                msg.contains("not aligned"),
+                "Expected alignment error, got: {}",
+                msg
+            );
+            println!("PASS: Alignment error surfaced as expected");
+        }
+        Err(e) => panic!("Unexpected error: {:?}", e),
+        Ok(_) => panic!("Expected alignment error, got Ok"),
+    }
+}
+
 #[test]
 #[cfg(target_os = "linux")]
 fn test_batch_dlpack_2d_shape() {

Reply via email to