[
https://issues.apache.org/jira/browse/MAHOUT-878?focusedWorklogId=1001262&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-1001262
]
ASF GitHub Bot logged work on MAHOUT-878:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Jan/26 03:24
Start Date: 22/Jan/26 03:24
Worklog Time Spent: 10m
Work Description: rich7420 commented on code in PR #881:
URL: https://github.com/apache/mahout/pull/881#discussion_r2715158517
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -300,6 +300,269 @@ impl QdpEngine {
encoding_method,
)
}
+
+ /// Encode from existing GPU pointer (zero-copy for CUDA tensors)
+ ///
+ /// This method enables zero-copy encoding from PyTorch CUDA tensors by
accepting
+ /// a raw GPU pointer directly, avoiding the GPU→CPU→GPU copy that would
otherwise
+ /// be required.
+ ///
+ /// TODO: Refactor to use QuantumEncoder trait (add `encode_from_gpu_ptr`
to trait)
+ /// to reduce duplication with AmplitudeEncoder::encode(). This would also
make it
+ /// easier to add GPU pointer support for other encoders (angle, basis) in
the future.
+ ///
+ /// # Arguments
+ /// * `input_d` - Device pointer to input data (f64 array on GPU)
+ /// * `input_len` - Number of f64 elements in the input
+ /// * `num_qubits` - Number of qubits for encoding
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported)
+ ///
+ /// # Returns
+ /// DLPack pointer for zero-copy PyTorch integration
+ ///
+ /// # Safety
+ /// The input pointer must:
+ /// - Point to valid GPU memory on the same device as the engine
+ /// - Contain at least `input_len` f64 elements
+ /// - Remain valid for the duration of this call
+ #[cfg(target_os = "linux")]
+ pub unsafe fn encode_from_gpu_ptr(
Review Comment:
The `encode_from_gpu_ptr` and `encode_batch_from_gpu_ptr` methods in
`QdpEngine` have some code duplication with `AmplitudeEncoder::encode()` and
`AmplitudeEncoder::encode_batch()`.
I think maybe we could add `encode_from_gpu_ptr` and
`encode_batch_from_gpu_ptr` methods to the `QuantumEncoder` trait (in
`qdp/qdp-core/src/gpu/encodings/mod.rs`). And then implement these methods in
`AmplitudeEncoder` (move the logic from `QdpEngine`) and simplify
`QdpEngine::encode_from_gpu_ptr` to just get the encoder and call its method.
WDYT?
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -300,6 +300,269 @@ impl QdpEngine {
encoding_method,
)
}
+
+ /// Encode from existing GPU pointer (zero-copy for CUDA tensors)
+ ///
+ /// This method enables zero-copy encoding from PyTorch CUDA tensors by
accepting
+ /// a raw GPU pointer directly, avoiding the GPU→CPU→GPU copy that would
otherwise
+ /// be required.
+ ///
+ /// TODO: Refactor to use QuantumEncoder trait (add `encode_from_gpu_ptr`
to trait)
+ /// to reduce duplication with AmplitudeEncoder::encode(). This would also
make it
+ /// easier to add GPU pointer support for other encoders (angle, basis) in
the future.
+ ///
+ /// # Arguments
+ /// * `input_d` - Device pointer to input data (f64 array on GPU)
+ /// * `input_len` - Number of f64 elements in the input
+ /// * `num_qubits` - Number of qubits for encoding
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported)
+ ///
+ /// # Returns
+ /// DLPack pointer for zero-copy PyTorch integration
+ ///
+ /// # Safety
+ /// The input pointer must:
+ /// - Point to valid GPU memory on the same device as the engine
+ /// - Contain at least `input_len` f64 elements
+ /// - Remain valid for the duration of this call
+ #[cfg(target_os = "linux")]
+ pub unsafe fn encode_from_gpu_ptr(
+ &self,
+ input_d: *const f64,
+ input_len: usize,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> Result<*mut DLManagedTensor> {
+ crate::profile_scope!("Mahout::EncodeFromGpuPtr");
+
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented(format!(
+ "GPU pointer encoding currently only supports 'amplitude'
method, got '{}'",
+ encoding_method
+ )));
+ }
+
+ if input_len == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Input data cannot be empty".into(),
+ ));
+ }
+
+ let state_len = 1usize << num_qubits;
+ if input_len > state_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "Input size {} exceeds state vector size {} (2^{} qubits)",
+ input_len, state_len, num_qubits
+ )));
+ }
+
+ // Allocate output state vector
+ let state_vector = {
+ crate::profile_scope!("GPU::Alloc");
+ gpu::GpuStateVector::new(&self.device, num_qubits)?
+ };
+
+ // Compute inverse L2 norm on GPU
+ let inv_norm = {
+ crate::profile_scope!("GPU::NormFromPtr");
+ // SAFETY: input_d validity is guaranteed by the caller's safety
contract
+ unsafe {
+ gpu::AmplitudeEncoder::calculate_inv_norm_gpu(&self.device,
input_d, input_len)?
+ }
+ };
+
+ // Get output pointer
+ let state_ptr = state_vector.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "State vector precision mismatch (expected float64
buffer)".to_string(),
+ )
+ })?;
+
+ // Launch encoding kernel
+ {
+ crate::profile_scope!("GPU::KernelLaunch");
+ let ret = unsafe {
+ qdp_kernels::launch_amplitude_encode(
+ input_d,
+ state_ptr as *mut std::ffi::c_void,
+ input_len,
+ state_len,
+ inv_norm,
+ std::ptr::null_mut(), // default stream
+ )
+ };
+
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(format!(
+ "Amplitude encode kernel failed with error code: {}",
+ ret
+ )));
+ }
+ }
+
+ // Synchronize
+ {
+ crate::profile_scope!("GPU::Synchronize");
+ self.device.synchronize().map_err(|e| {
+ MahoutError::Cuda(format!("CUDA device synchronize failed:
{:?}", e))
+ })?;
+ }
+
+ let state_vector = state_vector.to_precision(&self.device,
self.precision)?;
+ Ok(state_vector.to_dlpack())
+ }
+
+ /// Encode batch from existing GPU pointer (zero-copy for CUDA tensors)
+ ///
+ /// This method enables zero-copy batch encoding from PyTorch CUDA tensors.
+ ///
+ /// TODO: Refactor to use QuantumEncoder trait (see `encode_from_gpu_ptr`
TODO).
+ ///
+ /// # Arguments
+ /// * `input_batch_d` - Device pointer to batch input data (flattened f64
array on GPU)
+ /// * `num_samples` - Number of samples in the batch
+ /// * `sample_size` - Size of each sample in f64 elements
+ /// * `num_qubits` - Number of qubits for encoding
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported)
+ ///
+ /// # Returns
+ /// Single DLPack pointer containing all encoded states (shape:
[num_samples, 2^num_qubits])
+ ///
+ /// # Safety
+ /// The input pointer must:
+ /// - Point to valid GPU memory on the same device as the engine
+ /// - Contain at least `num_samples * sample_size` f64 elements
+ /// - Remain valid for the duration of this call
+ #[cfg(target_os = "linux")]
+ pub unsafe fn encode_batch_from_gpu_ptr(
+ &self,
+ input_batch_d: *const f64,
+ num_samples: usize,
+ sample_size: usize,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> Result<*mut DLManagedTensor> {
+ crate::profile_scope!("Mahout::EncodeBatchFromGpuPtr");
+
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented(format!(
+ "GPU pointer batch encoding currently only supports
'amplitude' method, got '{}'",
+ encoding_method
+ )));
+ }
+
+ if num_samples == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Number of samples cannot be zero".into(),
+ ));
+ }
+
+ if sample_size == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Sample size cannot be zero".into(),
+ ));
+ }
+
+ let state_len = 1usize << num_qubits;
+ if sample_size > state_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "Sample size {} exceeds state vector size {} (2^{} qubits)",
+ sample_size, state_len, num_qubits
+ )));
+ }
+
+ // Allocate output state vector
+ let batch_state_vector = {
+ crate::profile_scope!("GPU::AllocBatch");
+ gpu::GpuStateVector::new_batch(&self.device, num_samples,
num_qubits)?
+ };
+
+ // Compute inverse norms on GPU using warp-reduced kernel
+ let inv_norms_gpu = {
+ crate::profile_scope!("GPU::BatchNormKernel");
+ use cudarc::driver::DevicePtrMut;
+
+ let mut buffer =
self.device.alloc_zeros::<f64>(num_samples).map_err(|e| {
+ MahoutError::MemoryAllocation(format!("Failed to allocate norm
buffer: {:?}", e))
+ })?;
+
+ let ret = unsafe {
+ qdp_kernels::launch_l2_norm_batch(
+ input_batch_d,
+ num_samples,
+ sample_size,
+ *buffer.device_ptr_mut() as *mut f64,
+ std::ptr::null_mut(), // default stream
+ )
+ };
+
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(format!(
Review Comment:
same here
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -300,6 +300,269 @@ impl QdpEngine {
encoding_method,
)
}
+
+ /// Encode from existing GPU pointer (zero-copy for CUDA tensors)
+ ///
+ /// This method enables zero-copy encoding from PyTorch CUDA tensors by
accepting
+ /// a raw GPU pointer directly, avoiding the GPU→CPU→GPU copy that would
otherwise
+ /// be required.
+ ///
+ /// TODO: Refactor to use QuantumEncoder trait (add `encode_from_gpu_ptr`
to trait)
+ /// to reduce duplication with AmplitudeEncoder::encode(). This would also
make it
+ /// easier to add GPU pointer support for other encoders (angle, basis) in
the future.
+ ///
+ /// # Arguments
+ /// * `input_d` - Device pointer to input data (f64 array on GPU)
+ /// * `input_len` - Number of f64 elements in the input
+ /// * `num_qubits` - Number of qubits for encoding
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported)
+ ///
+ /// # Returns
+ /// DLPack pointer for zero-copy PyTorch integration
+ ///
+ /// # Safety
+ /// The input pointer must:
+ /// - Point to valid GPU memory on the same device as the engine
+ /// - Contain at least `input_len` f64 elements
+ /// - Remain valid for the duration of this call
+ #[cfg(target_os = "linux")]
+ pub unsafe fn encode_from_gpu_ptr(
+ &self,
+ input_d: *const f64,
+ input_len: usize,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> Result<*mut DLManagedTensor> {
+ crate::profile_scope!("Mahout::EncodeFromGpuPtr");
+
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented(format!(
+ "GPU pointer encoding currently only supports 'amplitude'
method, got '{}'",
+ encoding_method
+ )));
+ }
+
+ if input_len == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Input data cannot be empty".into(),
+ ));
+ }
+
+ let state_len = 1usize << num_qubits;
+ if input_len > state_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "Input size {} exceeds state vector size {} (2^{} qubits)",
+ input_len, state_len, num_qubits
+ )));
+ }
+
+ // Allocate output state vector
+ let state_vector = {
+ crate::profile_scope!("GPU::Alloc");
+ gpu::GpuStateVector::new(&self.device, num_qubits)?
+ };
+
+ // Compute inverse L2 norm on GPU
+ let inv_norm = {
+ crate::profile_scope!("GPU::NormFromPtr");
+ // SAFETY: input_d validity is guaranteed by the caller's safety
contract
+ unsafe {
+ gpu::AmplitudeEncoder::calculate_inv_norm_gpu(&self.device,
input_d, input_len)?
+ }
+ };
+
+ // Get output pointer
+ let state_ptr = state_vector.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "State vector precision mismatch (expected float64
buffer)".to_string(),
+ )
+ })?;
+
+ // Launch encoding kernel
+ {
+ crate::profile_scope!("GPU::KernelLaunch");
+ let ret = unsafe {
+ qdp_kernels::launch_amplitude_encode(
+ input_d,
+ state_ptr as *mut std::ffi::c_void,
+ input_len,
+ state_len,
+ inv_norm,
+ std::ptr::null_mut(), // default stream
+ )
+ };
+
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(format!(
+ "Amplitude encode kernel failed with error code: {}",
+ ret
+ )));
+ }
+ }
+
+ // Synchronize
+ {
+ crate::profile_scope!("GPU::Synchronize");
+ self.device.synchronize().map_err(|e| {
+ MahoutError::Cuda(format!("CUDA device synchronize failed:
{:?}", e))
+ })?;
+ }
+
+ let state_vector = state_vector.to_precision(&self.device,
self.precision)?;
+ Ok(state_vector.to_dlpack())
+ }
+
+ /// Encode batch from existing GPU pointer (zero-copy for CUDA tensors)
+ ///
+ /// This method enables zero-copy batch encoding from PyTorch CUDA tensors.
+ ///
+ /// TODO: Refactor to use QuantumEncoder trait (see `encode_from_gpu_ptr`
TODO).
+ ///
+ /// # Arguments
+ /// * `input_batch_d` - Device pointer to batch input data (flattened f64
array on GPU)
+ /// * `num_samples` - Number of samples in the batch
+ /// * `sample_size` - Size of each sample in f64 elements
+ /// * `num_qubits` - Number of qubits for encoding
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported)
+ ///
+ /// # Returns
+ /// Single DLPack pointer containing all encoded states (shape:
[num_samples, 2^num_qubits])
+ ///
+ /// # Safety
+ /// The input pointer must:
+ /// - Point to valid GPU memory on the same device as the engine
+ /// - Contain at least `num_samples * sample_size` f64 elements
+ /// - Remain valid for the duration of this call
+ #[cfg(target_os = "linux")]
+ pub unsafe fn encode_batch_from_gpu_ptr(
+ &self,
+ input_batch_d: *const f64,
+ num_samples: usize,
+ sample_size: usize,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> Result<*mut DLManagedTensor> {
+ crate::profile_scope!("Mahout::EncodeBatchFromGpuPtr");
+
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented(format!(
+ "GPU pointer batch encoding currently only supports
'amplitude' method, got '{}'",
+ encoding_method
+ )));
+ }
+
+ if num_samples == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Number of samples cannot be zero".into(),
+ ));
+ }
+
+ if sample_size == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Sample size cannot be zero".into(),
+ ));
+ }
+
+ let state_len = 1usize << num_qubits;
+ if sample_size > state_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "Sample size {} exceeds state vector size {} (2^{} qubits)",
+ sample_size, state_len, num_qubits
+ )));
+ }
+
+ // Allocate output state vector
+ let batch_state_vector = {
+ crate::profile_scope!("GPU::AllocBatch");
+ gpu::GpuStateVector::new_batch(&self.device, num_samples,
num_qubits)?
+ };
+
+ // Compute inverse norms on GPU using warp-reduced kernel
+ let inv_norms_gpu = {
+ crate::profile_scope!("GPU::BatchNormKernel");
+ use cudarc::driver::DevicePtrMut;
+
+ let mut buffer =
self.device.alloc_zeros::<f64>(num_samples).map_err(|e| {
+ MahoutError::MemoryAllocation(format!("Failed to allocate norm
buffer: {:?}", e))
+ })?;
+
+ let ret = unsafe {
+ qdp_kernels::launch_l2_norm_batch(
+ input_batch_d,
+ num_samples,
+ sample_size,
+ *buffer.device_ptr_mut() as *mut f64,
+ std::ptr::null_mut(), // default stream
+ )
+ };
+
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(format!(
+ "Norm reduction kernel failed: {}",
+ ret
+ )));
+ }
+
+ buffer
+ };
+
+ // Validate norms on host to catch zero or NaN samples early
+ {
+ crate::profile_scope!("GPU::NormValidation");
+ let host_inv_norms = self
+ .device
+ .dtoh_sync_copy(&inv_norms_gpu)
+ .map_err(|e| MahoutError::Cuda(format!("Failed to copy norms
to host: {:?}", e)))?;
+
+ if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) {
+ return Err(MahoutError::InvalidInput(
+ "One or more samples have zero or invalid
norm".to_string(),
+ ));
+ }
+ }
+
+ // Launch batch kernel
+ {
+ crate::profile_scope!("GPU::BatchKernelLaunch");
+ use cudarc::driver::DevicePtr;
+
+ let state_ptr = batch_state_vector.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "Batch state vector precision mismatch (expected float64
buffer)".to_string(),
+ )
+ })?;
+
+ let ret = unsafe {
+ qdp_kernels::launch_amplitude_encode_batch(
+ input_batch_d,
+ state_ptr as *mut std::ffi::c_void,
+ *inv_norms_gpu.device_ptr() as *const f64,
+ num_samples,
+ sample_size,
+ state_len,
+ std::ptr::null_mut(), // default stream
+ )
+ };
+
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(format!(
Review Comment:
and here
##########
qdp/qdp-core/src/lib.rs:
##########
@@ -300,6 +300,269 @@ impl QdpEngine {
encoding_method,
)
}
+
+ /// Encode from existing GPU pointer (zero-copy for CUDA tensors)
+ ///
+ /// This method enables zero-copy encoding from PyTorch CUDA tensors by
accepting
+ /// a raw GPU pointer directly, avoiding the GPU→CPU→GPU copy that would
otherwise
+ /// be required.
+ ///
+ /// TODO: Refactor to use QuantumEncoder trait (add `encode_from_gpu_ptr`
to trait)
+ /// to reduce duplication with AmplitudeEncoder::encode(). This would also
make it
+ /// easier to add GPU pointer support for other encoders (angle, basis) in
the future.
+ ///
+ /// # Arguments
+ /// * `input_d` - Device pointer to input data (f64 array on GPU)
+ /// * `input_len` - Number of f64 elements in the input
+ /// * `num_qubits` - Number of qubits for encoding
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported)
+ ///
+ /// # Returns
+ /// DLPack pointer for zero-copy PyTorch integration
+ ///
+ /// # Safety
+ /// The input pointer must:
+ /// - Point to valid GPU memory on the same device as the engine
+ /// - Contain at least `input_len` f64 elements
+ /// - Remain valid for the duration of this call
+ #[cfg(target_os = "linux")]
+ pub unsafe fn encode_from_gpu_ptr(
+ &self,
+ input_d: *const f64,
+ input_len: usize,
+ num_qubits: usize,
+ encoding_method: &str,
+ ) -> Result<*mut DLManagedTensor> {
+ crate::profile_scope!("Mahout::EncodeFromGpuPtr");
+
+ if encoding_method != "amplitude" {
+ return Err(MahoutError::NotImplemented(format!(
+ "GPU pointer encoding currently only supports 'amplitude'
method, got '{}'",
+ encoding_method
+ )));
+ }
+
+ if input_len == 0 {
+ return Err(MahoutError::InvalidInput(
+ "Input data cannot be empty".into(),
+ ));
+ }
+
+ let state_len = 1usize << num_qubits;
+ if input_len > state_len {
+ return Err(MahoutError::InvalidInput(format!(
+ "Input size {} exceeds state vector size {} (2^{} qubits)",
+ input_len, state_len, num_qubits
+ )));
+ }
+
+ // Allocate output state vector
+ let state_vector = {
+ crate::profile_scope!("GPU::Alloc");
+ gpu::GpuStateVector::new(&self.device, num_qubits)?
+ };
+
+ // Compute inverse L2 norm on GPU
+ let inv_norm = {
+ crate::profile_scope!("GPU::NormFromPtr");
+ // SAFETY: input_d validity is guaranteed by the caller's safety
contract
+ unsafe {
+ gpu::AmplitudeEncoder::calculate_inv_norm_gpu(&self.device,
input_d, input_len)?
+ }
+ };
+
+ // Get output pointer
+ let state_ptr = state_vector.ptr_f64().ok_or_else(|| {
+ MahoutError::InvalidInput(
+ "State vector precision mismatch (expected float64
buffer)".to_string(),
+ )
+ })?;
+
+ // Launch encoding kernel
+ {
+ crate::profile_scope!("GPU::KernelLaunch");
+ let ret = unsafe {
+ qdp_kernels::launch_amplitude_encode(
+ input_d,
+ state_ptr as *mut std::ffi::c_void,
+ input_len,
+ state_len,
+ inv_norm,
+ std::ptr::null_mut(), // default stream
+ )
+ };
+
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(format!(
Review Comment:
I think here we could use the `cuda_error_to_string` function that's already
in the codebase to convert error codes into readable names. This is already
done correctly in `AmplitudeEncoder`, so just copy that pattern.
Issue Time Tracking
-------------------
Worklog Id: (was: 1001262)
Time Spent: 4h 40m (was: 4.5h)
> Provide better examples for the parallel ALS recommender code
> -------------------------------------------------------------
>
> Key: MAHOUT-878
> URL: https://issues.apache.org/jira/browse/MAHOUT-878
> Project: Mahout
> Issue Type: Task
> Affects Versions: 1.0.0
> Reporter: Sebastian Schelter
> Assignee: Sebastian Schelter
> Priority: Major
> Fix For: 0.6
>
> Attachments: MAHOUT-878.patch
>
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> We should provide examples that show how to apply the parallel ALS
> recommender to the Netflix or KDD2011 datasets.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)