This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch dev-qdp
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/dev-qdp by this push:
new ce4b7caba [QDP] add batch kernel support (#700)
ce4b7caba is described below
commit ce4b7caba91680be9c6e4c4b4f27de935fe90fb9
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Tue Dec 9 19:04:59 2025 +0800
[QDP] add batch kernel support (#700)
* [QDP] Add batch encoding support
* Refactor batch pre-processing
---
qdp/benchmark/benchmark_e2e_final.py | 158 +++++++++++++++++------
qdp/qdp-core/src/error.rs | 3 +
qdp/qdp-core/src/gpu/encodings/amplitude.rs | 191 +++++++++++-----------------
qdp/qdp-core/src/gpu/encodings/mod.rs | 22 ++--
qdp/qdp-core/src/gpu/memory.rs | 42 ++++++
qdp/qdp-core/src/io.rs | 97 +++++++++++++-
qdp/qdp-core/src/lib.rs | 53 +++++---
qdp/qdp-core/src/preprocessing.rs | 55 ++++++++
qdp/qdp-kernels/src/amplitude.cu | 121 ++++++++++++++++++
qdp/qdp-kernels/src/lib.rs | 15 +++
qdp/qdp-python/src/lib.rs | 19 +--
11 files changed, 583 insertions(+), 193 deletions(-)
diff --git a/qdp/benchmark/benchmark_e2e_final.py
b/qdp/benchmark/benchmark_e2e_final.py
index fce668a2c..27efab93d 100644
--- a/qdp/benchmark/benchmark_e2e_final.py
+++ b/qdp/benchmark/benchmark_e2e_final.py
@@ -34,6 +34,7 @@ import torch
import torch.nn as nn
import numpy as np
import os
+import itertools
import pyarrow as pa
import pyarrow.parquet as pq
from mahout_qdp import QdpEngine
@@ -73,10 +74,6 @@ def generate_data(n_qubits, n_samples):
if os.path.exists(DATA_FILE):
os.remove(DATA_FILE)
- MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet")
- if os.path.exists(MAHOUT_DATA_FILE):
- os.remove(MAHOUT_DATA_FILE)
-
print(f"Generating {n_samples} samples of {n_qubits} qubits...")
dim = 1 << n_qubits
@@ -93,20 +90,9 @@ def generate_data(n_qubits, n_samples):
batch_table = pa.Table.from_arrays([arrays],
names=["feature_vector"])
writer.write_table(batch_table)
- # Generate for Mahout (flat Float64 format, one sample per batch)
- schema_flat = pa.schema([("data", pa.float64())])
- with pq.ParquetWriter(MAHOUT_DATA_FILE, schema_flat) as writer:
- for i in range(n_samples):
- sample_data = np.random.rand(dim).astype(np.float64)
- array = pa.array(sample_data, type=pa.float64())
- batch_table = pa.Table.from_arrays([array], names=["data"])
- writer.write_table(batch_table)
-
file_size_mb = os.path.getsize(DATA_FILE) / (1024 * 1024)
- mahout_size_mb = os.path.getsize(MAHOUT_DATA_FILE) / (1024 * 1024)
print(f" Generated {n_samples} samples")
- print(f" PennyLane/Qiskit format: {file_size_mb:.2f} MB")
- print(f" Mahout format: {mahout_size_mb:.2f} MB")
+ print(f" Parquet file size: {file_size_mb:.2f} MB")
# -----------------------------------------------------------
@@ -115,7 +101,7 @@ def generate_data(n_qubits, n_samples):
def run_qiskit(n_qubits, n_samples):
if not HAS_QISKIT:
print("\n[Qiskit] Not installed, skipping.")
- return 0.0
+ return 0.0, None
print("\n[Qiskit] Full Pipeline (Disk -> GPU)...")
model = DummyQNN(n_qubits).cuda()
@@ -132,6 +118,8 @@ def run_qiskit(n_qubits, n_samples):
io_time = time.perf_counter() - start_time
print(f" IO Time: {io_time:.4f} s")
+ all_qiskit_states = []
+
# Process batches
for i in range(0, n_samples, BATCH_SIZE):
batch = raw_data[i : i + BATCH_SIZE]
@@ -158,12 +146,15 @@ def run_qiskit(n_qubits, n_samples):
gpu_tensor = torch.tensor(
np.array(batch_states), device="cuda", dtype=torch.complex64
)
+ all_qiskit_states.append(gpu_tensor)
_ = model(gpu_tensor.abs())
torch.cuda.synchronize()
total_time = time.perf_counter() - start_time
print(f"\n Total Time: {total_time:.4f} s")
- return total_time
+
+ all_qiskit_tensor = torch.cat(all_qiskit_states, dim=0)
+ return total_time, all_qiskit_tensor
# -----------------------------------------------------------
@@ -172,7 +163,7 @@ def run_qiskit(n_qubits, n_samples):
def run_pennylane(n_qubits, n_samples):
if not HAS_PENNYLANE:
print("\n[PennyLane] Not installed, skipping.")
- return 0.0
+ return 0.0, None
print("\n[PennyLane] Full Pipeline (Disk -> GPU)...")
@@ -198,6 +189,8 @@ def run_pennylane(n_qubits, n_samples):
io_time = time.perf_counter() - start_time
print(f" IO Time: {io_time:.4f} s")
+ all_pl_states = []
+
# Process batches
for i in range(0, n_samples, BATCH_SIZE):
batch_cpu = torch.tensor(raw_data[i : i + BATCH_SIZE])
@@ -208,6 +201,8 @@ def run_pennylane(n_qubits, n_samples):
except Exception:
state_cpu = torch.stack([circuit(x) for x in batch_cpu])
+ all_pl_states.append(state_cpu)
+
# Transfer to GPU
state_gpu = state_cpu.to("cuda", dtype=torch.float32)
_ = model(state_gpu.abs())
@@ -215,7 +210,13 @@ def run_pennylane(n_qubits, n_samples):
torch.cuda.synchronize()
total_time = time.perf_counter() - start_time
print(f" Total Time: {total_time:.4f} s")
- return total_time
+
+ # Stack all collected states
+ all_pl_states_tensor = torch.cat(
+ all_pl_states, dim=0
+ ) # Should handle cases where last batch is smaller
+
+ return total_time, all_pl_states_tensor
# -----------------------------------------------------------
@@ -224,28 +225,31 @@ def run_pennylane(n_qubits, n_samples):
def run_mahout(engine, n_qubits, n_samples):
print("\n[Mahout] Full Pipeline (Disk -> GPU)...")
model = DummyQNN(n_qubits).cuda()
- MAHOUT_DATA_FILE = DATA_FILE.replace(".parquet", "_mahout.parquet")
torch.cuda.synchronize()
start_time = time.perf_counter()
- # Read Parquet and encode all samples
- import pyarrow.parquet as pq
-
- parquet_file = pq.ParquetFile(MAHOUT_DATA_FILE)
+ # Direct Parquet to GPU pipeline
+ parquet_encode_start = time.perf_counter()
+ batched_tensor = engine.encode_from_parquet(DATA_FILE, n_qubits,
"amplitude")
+ parquet_encode_time = time.perf_counter() - parquet_encode_start
+ print(f" Parquet->GPU (IO+Encode): {parquet_encode_time:.4f} s")
- all_states = []
- for batch in parquet_file.iter_batches():
- sample_data = batch.column(0).to_numpy()
- qtensor = engine.encode(sample_data.tolist(), n_qubits, "amplitude")
- gpu_state = torch.from_dlpack(qtensor)
- all_states.append(gpu_state)
+ # Convert to torch tensor (single DLPack call)
+ dlpack_start = time.perf_counter()
+ gpu_batched = torch.from_dlpack(batched_tensor)
+ dlpack_time = time.perf_counter() - dlpack_start
+ print(f" DLPack conversion: {dlpack_time:.4f} s")
- # Stack and convert
- gpu_all_data = torch.stack(all_states).abs().to(torch.float32)
+ # Reshape to [n_samples, state_len] (still complex)
+ state_len = 1 << n_qubits
+ gpu_reshaped = gpu_batched.view(n_samples, state_len)
- encode_time = time.perf_counter() - start_time
- print(f" IO + Encode Time: {encode_time:.4f} s")
+ # Convert to float for model (batch already on GPU)
+ reshape_start = time.perf_counter()
+ gpu_all_data = gpu_reshaped.abs().to(torch.float32)
+ reshape_time = time.perf_counter() - reshape_start
+ print(f" Reshape & convert: {reshape_time:.4f} s")
# Forward pass (data already on GPU)
for i in range(0, n_samples, BATCH_SIZE):
@@ -255,7 +259,46 @@ def run_mahout(engine, n_qubits, n_samples):
torch.cuda.synchronize()
total_time = time.perf_counter() - start_time
print(f" Total Time: {total_time:.4f} s")
- return total_time
+ return total_time, gpu_reshaped
+
+
+def compare_states(name_a, states_a, name_b, states_b):
+ print("\n" + "=" * 70)
+ print(f"VERIFICATION ({name_a} vs {name_b})")
+ print("=" * 70)
+
+ # Ensure both tensors are on GPU for comparison
+ n_compare = min(len(states_a), len(states_b))
+ tensor_a = states_a[:n_compare].cuda()
+ tensor_b = states_b[:n_compare].cuda()
+
+ # Compare Probabilities (|psi|^2)
+ diff_probs = (tensor_a.abs() ** 2 - tensor_b.abs() ** 2).abs().max().item()
+ print(f"Max Probability Difference: {diff_probs:.2e}")
+
+ # Compare Raw Amplitudes
+ # We compare full complex difference magnitude
+ diff_amps = (tensor_a - tensor_b).abs().max().item()
+ print(f"Max Amplitude Difference: {diff_amps:.2e}")
+
+ if diff_probs < 1e-5:
+ print(">> SUCCESS: Quantum States Match!")
+ else:
+ print(">> FAILURE: States do not match.")
+
+
+def verify_correctness(states_dict):
+ # Filter out None values
+ valid_states = {
+ name: states for name, states in states_dict.items() if states is not
None
+ }
+
+ if len(valid_states) < 2:
+ return
+
+ keys = sorted(list(valid_states.keys()))
+ for name_a, name_b in itertools.combinations(keys, 2):
+ compare_states(name_a, valid_states[name_a], name_b,
valid_states[name_b])
if __name__ == "__main__":
@@ -268,8 +311,26 @@ if __name__ == "__main__":
parser.add_argument(
"--samples", type=int, default=200, help="Number of training samples"
)
+ parser.add_argument(
+ "--frameworks",
+ nargs="+",
+ default=["mahout", "pennylane"],
+ choices=["mahout", "pennylane", "qiskit", "all"],
+ help="Frameworks to benchmark (default: mahout pennylane). Use 'all'
to run all available frameworks.",
+ )
args = parser.parse_args()
+ # Expand "all" option
+ if "all" in args.frameworks:
+ all_frameworks = []
+ if "mahout" in args.frameworks or "all" in args.frameworks:
+ all_frameworks.append("mahout")
+ if "pennylane" in args.frameworks or "all" in args.frameworks:
+ all_frameworks.append("pennylane")
+ if "qiskit" in args.frameworks or "all" in args.frameworks:
+ all_frameworks.append("qiskit")
+ args.frameworks = all_frameworks
+
generate_data(args.qubits, args.samples)
try:
@@ -282,10 +343,20 @@ if __name__ == "__main__":
print(f"E2E BENCHMARK: {args.qubits} Qubits, {args.samples} Samples")
print("=" * 70)
+ # Initialize results
+ t_pl, pl_all_states = 0.0, None
+ t_mahout, mahout_all_states = 0.0, None
+ t_qiskit, qiskit_all_states = 0.0, None
+
# Run benchmarks
- t_pl = run_pennylane(args.qubits, args.samples)
- t_qiskit = run_qiskit(args.qubits, args.samples)
- t_mahout = run_mahout(engine, args.qubits, args.samples)
+ if "pennylane" in args.frameworks:
+ t_pl, pl_all_states = run_pennylane(args.qubits, args.samples)
+
+ if "qiskit" in args.frameworks:
+ t_qiskit, qiskit_all_states = run_qiskit(args.qubits, args.samples)
+
+ if "mahout" in args.frameworks:
+ t_mahout, mahout_all_states = run_mahout(engine, args.qubits,
args.samples)
print("\n" + "=" * 70)
print("E2E LATENCY (Lower is Better)")
@@ -311,3 +382,12 @@ if __name__ == "__main__":
print(f"Speedup vs PennyLane: {t_pl / t_mahout:10.2f}x")
if t_qiskit > 0:
print(f"Speedup vs Qiskit: {t_qiskit / t_mahout:10.2f}x")
+
+ # Run Verification after benchmarks
+ verify_correctness(
+ {
+ "Mahout": mahout_all_states,
+ "PennyLane": pl_all_states,
+ "Qiskit": qiskit_all_states,
+ }
+ )
diff --git a/qdp/qdp-core/src/error.rs b/qdp/qdp-core/src/error.rs
index 5cfaabedf..5d7adfbd0 100644
--- a/qdp/qdp-core/src/error.rs
+++ b/qdp/qdp-core/src/error.rs
@@ -36,6 +36,9 @@ pub enum MahoutError {
#[error("I/O error: {0}")]
Io(String),
+
+ #[error("Not implemented: {0}")]
+ NotImplemented(String),
}
/// Result type alias for Mahout operations
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 6c4277125..ff1490c48 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -17,7 +17,7 @@
// Amplitude encoding: direct state injection with L2 normalization
use std::sync::Arc;
-use arrow::array::{Array, Float64Array};
+
use cudarc::driver::CudaDevice;
use crate::error::{MahoutError, Result};
use crate::gpu::memory::GpuStateVector;
@@ -29,7 +29,7 @@ use std::ffi::c_void;
#[cfg(target_os = "linux")]
use cudarc::driver::DevicePtr;
#[cfg(target_os = "linux")]
-use qdp_kernels::launch_amplitude_encode;
+use qdp_kernels::{launch_amplitude_encode, launch_amplitude_encode_batch};
#[cfg(target_os = "linux")]
use crate::gpu::memory::{ensure_device_memory_available, map_allocation_error};
@@ -133,140 +133,97 @@ impl QuantumEncoder for AmplitudeEncoder {
}
}
- fn name(&self) -> &'static str {
- "amplitude"
- }
-
- fn description(&self) -> &'static str {
- "Amplitude encoding with L2 normalization"
- }
-
- /// Override to avoid intermediate Vec allocation. Processes chunks
directly to GPU offsets.
- fn encode_chunked(
+ /// Encode multiple samples in a single GPU allocation and kernel launch
+ #[cfg(target_os = "linux")]
+ fn encode_batch(
&self,
device: &Arc<CudaDevice>,
- chunks: &[Float64Array],
+ batch_data: &[f64],
+ num_samples: usize,
+ sample_size: usize,
num_qubits: usize,
) -> Result<GpuStateVector> {
- #[cfg(target_os = "linux")]
- {
- let total_len: usize = chunks.iter().map(|c| c.len()).sum();
- let state_len = 1 << num_qubits;
+ crate::profile_scope!("AmplitudeEncoder::encode_batch");
- if total_len == 0 {
- return Err(MahoutError::InvalidInput("Input chunks cannot be
empty".to_string()));
- }
- if total_len > state_len {
- return Err(MahoutError::InvalidInput(
- format!("Total data length {} exceeds state vector size
{}", total_len, state_len)
- ));
- }
- if num_qubits == 0 || num_qubits > 30 {
- return Err(MahoutError::InvalidInput(
- format!("Number of qubits {} must be between 1 and 30",
num_qubits)
- ));
- }
+ // Validate inputs using shared preprocessor
+ Preprocessor::validate_batch(batch_data, num_samples, sample_size,
num_qubits)?;
- let state_vector = {
- crate::profile_scope!("GPU::Alloc");
- GpuStateVector::new(device, num_qubits)?
- };
-
- // Require pre-processed data (no nulls)
- for chunk in chunks {
- if chunk.null_count() > 0 {
- return Err(MahoutError::InvalidInput(
- format!("Chunk contains {} null values. Data must be
pre-processed before encoding", chunk.null_count())
- ));
- }
- }
+ let state_len = 1 << num_qubits;
- let norm = {
- crate::profile_scope!("CPU::L2Norm");
- let mut norm_sq = 0.0;
- for chunk in chunks {
- norm_sq += chunk.values().iter().map(|&x| x *
x).sum::<f64>();
- }
- let norm = norm_sq.sqrt();
- if norm == 0.0 {
- return Err(MahoutError::InvalidInput("Input data has zero
norm".to_string()));
- }
- norm
+ // Calculate L2 norms using shared preprocessor (parallelized)
+ let norms = Preprocessor::calculate_batch_l2_norms(batch_data,
num_samples, sample_size)?;
+
+ // Convert to inverse norms
+ let inv_norms: Vec<f64> = norms.iter().map(|n| 1.0 / n).collect();
+
+ // Allocate single large GPU buffer for all states
+ let batch_state_vector = {
+ crate::profile_scope!("GPU::AllocBatch");
+ GpuStateVector::new_batch(device, num_samples, num_qubits)?
+ };
+
+ // Upload input data to GPU
+ let input_batch_gpu = {
+ crate::profile_scope!("GPU::H2D_InputBatch");
+ device.htod_sync_copy(batch_data)
+ .map_err(|e| MahoutError::MemoryAllocation(
+ format!("Failed to upload batch input: {:?}", e)
+ ))?
+ };
+
+ // Upload inverse norms to GPU
+ let inv_norms_gpu = {
+ crate::profile_scope!("GPU::H2D_Norms");
+ device.htod_sync_copy(&inv_norms)
+ .map_err(|e| MahoutError::MemoryAllocation(
+ format!("Failed to upload norms: {:?}", e)
+ ))?
+ };
+
+ // Launch batch kernel
+ {
+ crate::profile_scope!("GPU::BatchKernelLaunch");
+ let ret = unsafe {
+ launch_amplitude_encode_batch(
+ *input_batch_gpu.device_ptr() as *const f64,
+ batch_state_vector.ptr() as *mut c_void,
+ *inv_norms_gpu.device_ptr() as *const f64,
+ num_samples,
+ sample_size,
+ state_len,
+ std::ptr::null_mut(), // default stream
+ )
};
- let mut current_offset = 0;
- for chunk in chunks {
- let chunk_len = chunk.len();
- if chunk_len == 0 {
- continue;
- }
-
- let state_ptr_offset = unsafe {
- state_vector.ptr().cast::<u8>()
- .add(current_offset *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>())
- .cast::<std::ffi::c_void>()
- };
-
- let chunk_slice = {
- crate::profile_scope!("GPU::ChunkH2DCopy");
- // Zero-copy from Arrow buffer to GPU
- device.htod_sync_copy(chunk.values())
- .map_err(|e|
MahoutError::MemoryAllocation(format!("Failed to copy chunk: {:?}", e)))?
- };
-
- {
- crate::profile_scope!("GPU::KernelLaunch");
- let ret = unsafe {
- launch_amplitude_encode(
- *chunk_slice.device_ptr() as *const f64,
- state_ptr_offset,
- chunk_len,
- state_len,
- norm,
- std::ptr::null_mut(),
- )
- };
- if ret != 0 {
- return Err(MahoutError::KernelLaunch(
- format!("Kernel launch failed: {} ({})", ret,
cuda_error_to_string(ret))
- ));
- }
- }
-
- current_offset += chunk_len;
- }
-
- if total_len < state_len {
- let padding_bytes = (state_len - total_len) *
std::mem::size_of::<qdp_kernels::CuDoubleComplex>();
- let tail_ptr = unsafe { state_vector.ptr().add(total_len) as
*mut c_void };
-
- unsafe {
- unsafe extern "C" {
- fn cudaMemsetAsync(devPtr: *mut c_void, value: i32,
count: usize, stream: *mut c_void) -> i32;
- }
- let result = cudaMemsetAsync(tail_ptr, 0, padding_bytes,
std::ptr::null_mut());
- if result != 0 {
- return Err(MahoutError::Cuda(
- format!("Failed to zero-fill padding: {} ({})",
result, cuda_error_to_string(result))
- ));
- }
- }
+ if ret != 0 {
+ return Err(MahoutError::KernelLaunch(
+ format!("Batch kernel launch failed: {} ({})", ret,
cuda_error_to_string(ret))
+ ));
}
+ }
+ // Synchronize
+ {
+ crate::profile_scope!("GPU::Synchronize");
device.synchronize()
.map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}",
e)))?;
-
- Ok(state_vector)
}
- #[cfg(not(target_os = "linux"))]
- {
- Err(MahoutError::Cuda("CUDA unavailable (non-Linux)".to_string()))
- }
+ Ok(batch_state_vector)
+ }
+
+ fn name(&self) -> &'static str {
+ "amplitude"
+ }
+
+ fn description(&self) -> &'static str {
+ "Amplitude encoding with L2 normalization"
}
}
impl AmplitudeEncoder {
+
+
/// Async pipeline encoding for large data (SSS-tier optimization)
///
/// Uses the generic dual-stream pipeline infrastructure to overlap
diff --git a/qdp/qdp-core/src/gpu/encodings/mod.rs
b/qdp/qdp-core/src/gpu/encodings/mod.rs
index 539355c2f..7273179d0 100644
--- a/qdp/qdp-core/src/gpu/encodings/mod.rs
+++ b/qdp/qdp-core/src/gpu/encodings/mod.rs
@@ -17,7 +17,7 @@
// Quantum encoding strategies (Strategy Pattern)
use std::sync::Arc;
-use arrow::array::Float64Array;
+
use cudarc::driver::CudaDevice;
use crate::error::Result;
use crate::gpu::memory::GpuStateVector;
@@ -34,18 +34,18 @@ pub trait QuantumEncoder: Send + Sync {
num_qubits: usize,
) -> Result<GpuStateVector>;
- /// Encode from chunked Arrow arrays
- ///
- /// Default implementation flattens chunks. (TODO: Encoders can override
for true zero-copy.)
- fn encode_chunked(
+ /// Encode multiple samples in a single GPU allocation and kernel launch
(Batch Encoding)
+ fn encode_batch(
&self,
- device: &Arc<CudaDevice>,
- chunks: &[Float64Array],
- num_qubits: usize,
+ _device: &Arc<CudaDevice>,
+ _batch_data: &[f64],
+ _num_samples: usize,
+ _sample_size: usize,
+ _num_qubits: usize,
) -> Result<GpuStateVector> {
- // Default: flatten and use regular encode
- let data = crate::io::arrow_to_vec_chunked(chunks);
- self.encode(device, &data, num_qubits)
+ Err(crate::error::MahoutError::NotImplemented(
+ format!("Batch encoding not implemented for {}", self.name())
+ ))
}
/// Validate input data before encoding
diff --git a/qdp/qdp-core/src/gpu/memory.rs b/qdp/qdp-core/src/gpu/memory.rs
index 1ac8eabb5..a333e103d 100644
--- a/qdp/qdp-core/src/gpu/memory.rs
+++ b/qdp/qdp-core/src/gpu/memory.rs
@@ -220,4 +220,46 @@ impl GpuStateVector {
pub fn size_elements(&self) -> usize {
self.size_elements
}
+
+ /// Create GPU state vector for a batch of samples
+ /// Allocates num_samples * 2^qubits complex numbers on GPU
+ pub fn new_batch(_device: &Arc<CudaDevice>, num_samples: usize, qubits:
usize) -> Result<Self> {
+ let single_state_size: usize = 1usize << qubits;
+ let total_elements = num_samples.checked_mul(single_state_size)
+ .ok_or_else(|| MahoutError::MemoryAllocation(
+ format!("Batch size overflow: {} samples * {} elements",
num_samples, single_state_size)
+ ))?;
+
+ #[cfg(target_os = "linux")]
+ {
+ let requested_bytes = total_elements
+ .checked_mul(std::mem::size_of::<CuDoubleComplex>())
+ .ok_or_else(|| MahoutError::MemoryAllocation(
+ format!("Requested GPU allocation size overflow
(elements={})", total_elements)
+ ))?;
+
+ // Pre-flight check
+ ensure_device_memory_available(requested_bytes, "batch state
vector allocation", Some(qubits))?;
+
+ let slice = unsafe {
+ _device.alloc::<CuDoubleComplex>(total_elements)
+ }.map_err(|e| map_allocation_error(
+ requested_bytes,
+ "batch state vector allocation",
+ Some(qubits),
+ e,
+ ))?;
+
+ Ok(Self {
+ buffer: Arc::new(GpuBufferRaw { slice }),
+ num_qubits: qubits,
+ size_elements: total_elements,
+ })
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ {
+ Err(MahoutError::Cuda("CUDA is only available on Linux. This build
does not support GPU operations.".to_string()))
+ }
+ }
}
diff --git a/qdp/qdp-core/src/io.rs b/qdp/qdp-core/src/io.rs
index fc9f09cd4..93ad1d018 100644
--- a/qdp/qdp-core/src/io.rs
+++ b/qdp/qdp-core/src/io.rs
@@ -22,7 +22,7 @@ use std::fs::File;
use std::path::Path;
use std::sync::Arc;
-use arrow::array::{Array, ArrayRef, Float64Array, RecordBatch};
+use arrow::array::{Array, ArrayRef, Float64Array, ListArray, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
@@ -270,3 +270,98 @@ pub fn write_arrow_to_parquet<P: AsRef<Path>>(
Ok(())
}
+
+/// Read batch data from Parquet file with list column format
+///
+/// Efficiently reads Parquet files where each row contains a list of values.
+/// Returns a flattened Vec with all samples concatenated, suitable for batch
encoding.
+///
+/// # Arguments
+/// * `path` - Path to Parquet file
+///
+/// # Returns
+/// Tuple of (flattened_data, num_samples, sample_size)
+///
+/// # Example
+/// File format: column "feature_vector" with type List<Float64>
+/// Each row = one sample = one list of floats
+pub fn read_parquet_batch<P: AsRef<Path>>(path: P) -> Result<(Vec<f64>, usize,
usize)> {
+ let file = File::open(path.as_ref()).map_err(|e| {
+ MahoutError::Io(format!("Failed to open Parquet file: {}", e))
+ })?;
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
+ MahoutError::Io(format!("Failed to create Parquet reader: {}", e))
+ })?;
+
+ let mut reader = builder.build().map_err(|e| {
+ MahoutError::Io(format!("Failed to build Parquet reader: {}", e))
+ })?;
+
+ let mut all_data = Vec::new();
+ let mut num_samples = 0;
+ let mut sample_size = None;
+
+ while let Some(batch_result) = reader.next() {
+ let batch = batch_result.map_err(|e| {
+ MahoutError::Io(format!("Failed to read Parquet batch: {}", e))
+ })?;
+
+ if batch.num_columns() == 0 {
+ return Err(MahoutError::Io("Parquet file has no
columns".to_string()));
+ }
+
+ let column = batch.column(0);
+
+ // Handle List<Float64> column type
+ if let DataType::List(_) = column.data_type() {
+ let list_array = column
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| MahoutError::Io("Failed to downcast to
ListArray".to_string()))?;
+
+ for i in 0..list_array.len() {
+ let value_array = list_array.value(i);
+ let float_array = value_array
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| MahoutError::Io("List values must be
Float64".to_string()))?;
+
+ let current_size = float_array.len();
+
+ // Verify all samples have the same size
+ if let Some(expected_size) = sample_size {
+ if current_size != expected_size {
+ return Err(MahoutError::InvalidInput(format!(
+ "Inconsistent sample sizes: expected {}, got {}",
+ expected_size, current_size
+ )));
+ }
+ } else {
+ sample_size = Some(current_size);
+ all_data.reserve(current_size * 100); // Reserve space
+ }
+
+ // Efficiently copy the values
+ if float_array.null_count() == 0 {
+ all_data.extend_from_slice(float_array.values());
+ } else {
+ all_data.extend(float_array.iter().map(|opt|
opt.unwrap_or(0.0)));
+ }
+
+ num_samples += 1;
+ }
+ } else {
+ return Err(MahoutError::Io(format!(
+ "Expected List<Float64> column, got {:?}",
+ column.data_type()
+ )));
+ }
+ }
+
+ let sample_size = sample_size.ok_or_else(|| {
+ MahoutError::Io("Parquet file contains no data".to_string())
+ })?;
+
+ Ok((all_data, num_samples, sample_size))
+}
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 406715edd..65c08bd02 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -26,7 +26,7 @@ mod profiling;
pub use error::{MahoutError, Result};
use std::sync::Arc;
-use arrow::array::Float64Array;
+
use cudarc::driver::CudaDevice;
use crate::dlpack::DLManagedTensor;
use crate::gpu::get_encoder;
@@ -88,40 +88,55 @@ impl QdpEngine {
&self.device
}
- /// Encode from chunked Arrow arrays (zero-copy from Parquet)
+ /// Encode multiple samples in a single fused kernel (most efficient)
+ ///
+ /// Allocates one large GPU buffer and launches a single batch kernel.
+ /// This is faster than encode_batch() as it reduces allocation and kernel
launch overhead.
///
/// # Arguments
- /// * `chunks` - Chunked Arrow Float64Arrays (from read_parquet_to_arrow)
+ /// * `batch_data` - Flattened batch data (all samples concatenated)
+ /// * `num_samples` - Number of samples in the batch
+ /// * `sample_size` - Size of each sample
/// * `num_qubits` - Number of qubits
- /// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+ /// * `encoding_method` - Strategy (currently only "amplitude" supported
for batch)
///
/// # Returns
- /// DLPack pointer for zero-copy PyTorch integration
- pub fn encode_chunked(
+ /// Single DLPack pointer containing all encoded states (shape:
[num_samples, 2^num_qubits])
+ pub fn encode_batch(
&self,
- chunks: &[Float64Array],
+ batch_data: &[f64],
+ num_samples: usize,
+ sample_size: usize,
num_qubits: usize,
encoding_method: &str,
) -> Result<*mut DLManagedTensor> {
- crate::profile_scope!("Mahout::EncodeChunked");
+ crate::profile_scope!("Mahout::EncodeBatch");
let encoder = get_encoder(encoding_method)?;
- let state_vector = encoder.encode_chunked(&self.device, chunks,
num_qubits)?;
- let dlpack_ptr = {
- crate::profile_scope!("Mahout::CreateDLPack");
- state_vector.to_dlpack()
- };
+ let state_vector = encoder.encode_batch(
+ &self.device,
+ batch_data,
+ num_samples,
+ sample_size,
+ num_qubits,
+ )?;
+
+ let dlpack_ptr = state_vector.to_dlpack();
Ok(dlpack_ptr)
}
/// Load data from Parquet file and encode into quantum state
///
- /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec
allocation.
+ /// Reads Parquet file with List<Float64> column format and encodes all
samples
+ /// in a single batch operation. Bypasses pandas for maximum performance.
///
/// # Arguments
/// * `path` - Path to Parquet file
/// * `num_qubits` - Number of qubits
/// * `encoding_method` - Strategy: "amplitude", "angle", or "basis"
+ ///
+ /// # Returns
+ /// Single DLPack pointer containing all encoded states (shape:
[num_samples, 2^num_qubits])
pub fn encode_from_parquet(
&self,
path: &str,
@@ -130,8 +145,14 @@ impl QdpEngine {
) -> Result<*mut DLManagedTensor> {
crate::profile_scope!("Mahout::EncodeFromParquet");
- let chunks = crate::io::read_parquet_to_arrow(path)?;
- self.encode_chunked(&chunks, num_qubits, encoding_method)
+ // Read Parquet directly using Arrow (faster than pandas)
+ let (batch_data, num_samples, sample_size) = {
+ crate::profile_scope!("IO::ReadParquetBatch");
+ crate::io::read_parquet_batch(path)?
+ };
+
+ // Encode using fused batch kernel
+ self.encode_batch(&batch_data, num_samples, sample_size, num_qubits,
encoding_method)
}
}
diff --git a/qdp/qdp-core/src/preprocessing.rs
b/qdp/qdp-core/src/preprocessing.rs
index 233548a0f..0d8e70148 100644
--- a/qdp/qdp-core/src/preprocessing.rs
+++ b/qdp/qdp-core/src/preprocessing.rs
@@ -76,4 +76,59 @@ impl Preprocessor {
Ok(norm)
}
+
+ /// Validates input constraints for batch processing.
+ pub fn validate_batch(
+ batch_data: &[f64],
+ num_samples: usize,
+ sample_size: usize,
+ num_qubits: usize,
+ ) -> Result<()> {
+ if batch_data.len() != num_samples * sample_size {
+ return Err(MahoutError::InvalidInput(
+ format!("Batch data length {} doesn't match num_samples {} *
sample_size {}",
+ batch_data.len(), num_samples, sample_size)
+ ));
+ }
+
+ if num_qubits == 0 || num_qubits > 30 {
+ return Err(MahoutError::InvalidInput(
+ format!("Number of qubits {} must be between 1 and 30",
num_qubits)
+ ));
+ }
+
+ let state_len = 1 << num_qubits;
+ if sample_size > state_len {
+ return Err(MahoutError::InvalidInput(
+ format!("Sample size {} exceeds state vector size {}",
sample_size, state_len)
+ ));
+ }
+
+ Ok(())
+ }
+
+ /// Calculates L2 norms for a batch of samples in parallel.
+ pub fn calculate_batch_l2_norms(
+ batch_data: &[f64],
+ _num_samples: usize,
+ sample_size: usize,
+ ) -> Result<Vec<f64>> {
+ crate::profile_scope!("CPU::BatchL2Norm");
+
+ // Process chunks in parallel using rayon
+ batch_data
+ .par_chunks(sample_size)
+ .enumerate()
+ .map(|(i, sample)| {
+ let norm_sq: f64 = sample.iter().map(|&x| x * x).sum();
+ let norm = norm_sq.sqrt();
+ if norm == 0.0 {
+ return Err(MahoutError::InvalidInput(
+ format!("Sample {} has zero norm", i)
+ ));
+ }
+ Ok(norm)
+ })
+ .collect()
+ }
}
diff --git a/qdp/qdp-kernels/src/amplitude.cu b/qdp/qdp-kernels/src/amplitude.cu
index b1cd06fea..4dd80fb0b 100644
--- a/qdp/qdp-kernels/src/amplitude.cu
+++ b/qdp/qdp-kernels/src/amplitude.cu
@@ -110,6 +110,127 @@ int launch_amplitude_encode(
return (int)cudaGetLastError();
}
+/// Optimized batch amplitude encoding kernel
+///
+/// Memory Layout (row-major):
+/// - input_batch: [sample0_data | sample1_data | ... | sampleN_data]
+/// - state_batch: [sample0_state | sample1_state | ... | sampleN_state]
+///
+/// Optimizations:
+/// 1. Vectorized double2 loads for 128-bit memory transactions
+/// 2. Grid-stride loop for arbitrary batch sizes
+/// 3. Coalesced memory access within warps
+/// 4. Minimized register pressure
+__global__ void amplitude_encode_batch_kernel(
+ const double* __restrict__ input_batch,
+ cuDoubleComplex* __restrict__ state_batch,
+ const double* __restrict__ inv_norms,
+ size_t num_samples,
+ size_t input_len,
+ size_t state_len
+) {
+ // Grid-stride loop pattern for flexibility
+ const size_t elements_per_sample = state_len / 2; // Each thread handles
2 elements
+ const size_t total_work = num_samples * elements_per_sample;
+ const size_t stride = gridDim.x * blockDim.x;
+
+ size_t global_idx = blockIdx.x * blockDim.x + threadIdx.x;
+
+ // Process elements in grid-stride fashion
+ for (size_t idx = global_idx; idx < total_work; idx += stride) {
+ // Decompose linear index into (sample, element_pair)
+ const size_t sample_idx = idx / elements_per_sample;
+ const size_t elem_pair = idx % elements_per_sample;
+
+ // Calculate base addresses (strength-reduced)
+ const size_t input_base = sample_idx * input_len;
+ const size_t state_base = sample_idx * state_len;
+ const size_t elem_offset = elem_pair * 2;
+
+ // Load inverse norm (cached by L1)
+ const double inv_norm = inv_norms[sample_idx];
+
+ // Vectorized load: read 2 doubles as double2 for 128-bit transaction
+ double v1, v2;
+ if (elem_offset + 1 < input_len) {
+ // Aligned vectorized load
+ const double2 vec_data = __ldg(reinterpret_cast<const
double2*>(input_batch + input_base) + elem_pair);
+ v1 = vec_data.x;
+ v2 = vec_data.y;
+ } else if (elem_offset < input_len) {
+ // Edge case: single element load
+ v1 = __ldg(input_batch + input_base + elem_offset);
+ v2 = 0.0;
+ } else {
+ // Padding region
+ v1 = v2 = 0.0;
+ }
+
+ // Normalize and write as complex numbers
+ // Compiler will optimize multiplications
+ const cuDoubleComplex c1 = make_cuDoubleComplex(v1 * inv_norm, 0.0);
+ const cuDoubleComplex c2 = make_cuDoubleComplex(v2 * inv_norm, 0.0);
+
+ // Write to global memory (coalesced within warp)
+ state_batch[state_base + elem_offset] = c1;
+ if (elem_offset + 1 < state_len) {
+ state_batch[state_base + elem_offset + 1] = c2;
+ }
+ }
+}
+
+/// Launch optimized batch amplitude encoding kernel
+///
+/// # Arguments
+/// * input_batch_d - Device pointer to batch input data
+/// * state_batch_d - Device pointer to output batch state vectors
+/// * inv_norms_d - Device pointer to inverse norms array
+/// * num_samples - Number of samples in batch
+/// * input_len - Elements per sample
+/// * state_len - State vector size per sample (2^num_qubits)
+/// * stream - CUDA stream for async execution
+///
+/// # Returns
+/// CUDA error code (0 = cudaSuccess)
+int launch_amplitude_encode_batch(
+ const double* input_batch_d,
+ void* state_batch_d,
+ const double* inv_norms_d,
+ size_t num_samples,
+ size_t input_len,
+ size_t state_len,
+ cudaStream_t stream
+) {
+ if (num_samples == 0 || state_len == 0) {
+ return cudaErrorInvalidValue;
+ }
+
+ cuDoubleComplex* state_complex_d =
static_cast<cuDoubleComplex*>(state_batch_d);
+
+ // Optimal configuration for modern GPUs (SM 7.0+)
+ // - Block size: 256 threads (8 warps, good occupancy)
+ // - Grid size: Enough blocks to saturate GPU, but not excessive
+ const int blockSize = 256;
+ const size_t total_work = num_samples * (state_len / 2);
+
+ // Calculate grid size: aim for high occupancy without too many blocks
+ // Limit to reasonable number of blocks to avoid scheduler overhead
+ const size_t blocks_needed = (total_work + blockSize - 1) / blockSize;
+ const size_t max_blocks = 2048; // Reasonable limit for most GPUs
+ const size_t gridSize = (blocks_needed < max_blocks) ? blocks_needed :
max_blocks;
+
+ amplitude_encode_batch_kernel<<<gridSize, blockSize, 0, stream>>>(
+ input_batch_d,
+ state_complex_d,
+ inv_norms_d,
+ num_samples,
+ input_len,
+ state_len
+ );
+
+ return (int)cudaGetLastError();
+}
+
// TODO: Future encoding methods:
// - launch_angle_encode (angle encoding)
// - launch_basis_encode (basis encoding)
diff --git a/qdp/qdp-kernels/src/lib.rs b/qdp/qdp-kernels/src/lib.rs
index a59733fb8..c1bb07544 100644
--- a/qdp/qdp-kernels/src/lib.rs
+++ b/qdp/qdp-kernels/src/lib.rs
@@ -53,6 +53,21 @@ unsafe extern "C" {
stream: *mut c_void,
) -> i32;
+ /// Launch batch amplitude encoding kernel
+ /// Returns CUDA error code (0 = success)
+ ///
+ /// # Safety
+ /// Requires valid GPU pointers, must sync before freeing
+ pub fn launch_amplitude_encode_batch(
+ input_batch_d: *const f64,
+ state_batch_d: *mut c_void,
+ inv_norms_d: *const f64,
+ num_samples: usize,
+ input_len: usize,
+ state_len: usize,
+ stream: *mut c_void,
+ ) -> i32;
+
// TODO: launch_angle_encode, launch_basis_encode
}
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 4ff1e4b02..cd3fcc3ac 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -181,25 +181,26 @@ impl QdpEngine {
})
}
- /// Load data from Parquet file and encode into quantum state
+ /// Encode from Parquet file (FASTEST - recommended for batches)
///
- /// **ZERO-COPY**: Reads Parquet chunks directly without intermediate Vec
allocation.
+ /// Direct Parquet→GPU pipeline:
+ /// - Reads List<Float64> column format using Arrow
+ /// - Zero-copy data extraction
+ /// - Single optimized batch kernel launch
+ /// - Returns batched tensor (shape: [num_samples, 2^num_qubits])
///
/// Args:
/// path: Path to Parquet file
/// num_qubits: Number of qubits for encoding
- /// encoding_method: Encoding strategy ("amplitude", "angle", or
"basis")
+ /// encoding_method: Encoding strategy (currently only "amplitude")
///
/// Returns:
- /// QuantumTensor: DLPack-compatible tensor for zero-copy PyTorch
integration
- ///
- /// Raises:
- /// RuntimeError: If encoding fails
+ /// QuantumTensor: DLPack tensor containing all encoded states
///
/// Example:
/// >>> engine = QdpEngine(device_id=0)
- /// >>> qtensor = engine.encode_from_parquet("data.parquet",
num_qubits=2, encoding_method="amplitude")
- /// >>> torch_tensor = torch.from_dlpack(qtensor)
+ /// >>> batched = engine.encode_from_parquet("data.parquet", 16,
"amplitude")
+ /// >>> torch_tensor = torch.from_dlpack(batched) # Shape: [200,
65536]
fn encode_from_parquet(&self, path: &str, num_qubits: usize,
encoding_method: &str) -> PyResult<QuantumTensor> {
let ptr = self.engine.encode_from_parquet(path, num_qubits,
encoding_method)
.map_err(|e| PyRuntimeError::new_err(format!("Encoding from
parquet failed: {}", e)))?;