This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit 51f942ada7d9f0ac9ac05139c7359eaddc3d79bf Author: KUAN-HAO HUANG <[email protected]> AuthorDate: Sun Dec 7 20:27:49 2025 +0800 [QDP] improve amplitudeEncoders for less copy memory allocations (#690) * improve amplitudeEncoders for less copy memory allocations * Fix: apply pre-commit end-of-file-fixer * use pre-processing module --- qdp/qdp-core/src/gpu/encodings/amplitude.rs | 127 +++++++++++++++++++++++++++- 1 file changed, 126 insertions(+), 1 deletion(-) diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs b/qdp/qdp-core/src/gpu/encodings/amplitude.rs index 9868a17bc..6c4277125 100644 --- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs +++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs @@ -17,6 +17,7 @@ // Amplitude encoding: direct state injection with L2 normalization use std::sync::Arc; +use arrow::array::{Array, Float64Array}; use cudarc::driver::CudaDevice; use crate::error::{MahoutError, Result}; use crate::gpu::memory::GpuStateVector; @@ -60,7 +61,7 @@ impl QuantumEncoder for AmplitudeEncoder { GpuStateVector::new(_device, num_qubits)? }; - // SSS-Tier Optimization: Async Pipeline for large data + // Async Pipeline for large data // For small data (< 1MB), use synchronous path to avoid stream overhead // For large data, use dual-stream async pipeline for maximum throughput const ASYNC_THRESHOLD: usize = 1024 * 1024 / std::mem::size_of::<f64>(); // 1MB threshold @@ -139,6 +140,130 @@ impl QuantumEncoder for AmplitudeEncoder { fn description(&self) -> &'static str { "Amplitude encoding with L2 normalization" } + + /// Override to avoid intermediate Vec allocation. Processes chunks directly to GPU offsets. + fn encode_chunked( + &self, + device: &Arc<CudaDevice>, + chunks: &[Float64Array], + num_qubits: usize, + ) -> Result<GpuStateVector> { + #[cfg(target_os = "linux")] + { + let total_len: usize = chunks.iter().map(|c| c.len()).sum(); + let state_len = 1 << num_qubits; + + if total_len == 0 { + return Err(MahoutError::InvalidInput("Input chunks cannot be empty".to_string())); + } + if total_len > state_len { + return Err(MahoutError::InvalidInput( + format!("Total data length {} exceeds state vector size {}", total_len, state_len) + )); + } + if num_qubits == 0 || num_qubits > 30 { + return Err(MahoutError::InvalidInput( + format!("Number of qubits {} must be between 1 and 30", num_qubits) + )); + } + + let state_vector = { + crate::profile_scope!("GPU::Alloc"); + GpuStateVector::new(device, num_qubits)? + }; + + // Require pre-processed data (no nulls) + for chunk in chunks { + if chunk.null_count() > 0 { + return Err(MahoutError::InvalidInput( + format!("Chunk contains {} null values. Data must be pre-processed before encoding", chunk.null_count()) + )); + } + } + + let norm = { + crate::profile_scope!("CPU::L2Norm"); + let mut norm_sq = 0.0; + for chunk in chunks { + norm_sq += chunk.values().iter().map(|&x| x * x).sum::<f64>(); + } + let norm = norm_sq.sqrt(); + if norm == 0.0 { + return Err(MahoutError::InvalidInput("Input data has zero norm".to_string())); + } + norm + }; + + let mut current_offset = 0; + for chunk in chunks { + let chunk_len = chunk.len(); + if chunk_len == 0 { + continue; + } + + let state_ptr_offset = unsafe { + state_vector.ptr().cast::<u8>() + .add(current_offset * std::mem::size_of::<qdp_kernels::CuDoubleComplex>()) + .cast::<std::ffi::c_void>() + }; + + let chunk_slice = { + crate::profile_scope!("GPU::ChunkH2DCopy"); + // Zero-copy from Arrow buffer to GPU + device.htod_sync_copy(chunk.values()) + .map_err(|e| MahoutError::MemoryAllocation(format!("Failed to copy chunk: {:?}", e)))? + }; + + { + crate::profile_scope!("GPU::KernelLaunch"); + let ret = unsafe { + launch_amplitude_encode( + *chunk_slice.device_ptr() as *const f64, + state_ptr_offset, + chunk_len, + state_len, + norm, + std::ptr::null_mut(), + ) + }; + if ret != 0 { + return Err(MahoutError::KernelLaunch( + format!("Kernel launch failed: {} ({})", ret, cuda_error_to_string(ret)) + )); + } + } + + current_offset += chunk_len; + } + + if total_len < state_len { + let padding_bytes = (state_len - total_len) * std::mem::size_of::<qdp_kernels::CuDoubleComplex>(); + let tail_ptr = unsafe { state_vector.ptr().add(total_len) as *mut c_void }; + + unsafe { + unsafe extern "C" { + fn cudaMemsetAsync(devPtr: *mut c_void, value: i32, count: usize, stream: *mut c_void) -> i32; + } + let result = cudaMemsetAsync(tail_ptr, 0, padding_bytes, std::ptr::null_mut()); + if result != 0 { + return Err(MahoutError::Cuda( + format!("Failed to zero-fill padding: {} ({})", result, cuda_error_to_string(result)) + )); + } + } + } + + device.synchronize() + .map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}", e)))?; + + Ok(state_vector) + } + + #[cfg(not(target_os = "linux"))] + { + Err(MahoutError::Cuda("CUDA unavailable (non-Linux)".to_string())) + } + } } impl AmplitudeEncoder {
