This is an automated email from the ASF dual-hosted git repository. guanmingchiu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/mahout.git
commit 5b6055a472f6cd63d56d04929eff4da95bb46151 Author: Ping <[email protected]> AuthorDate: Mon Jan 5 21:54:34 2026 +0800 [QDP] Update benchmark_throughput to batch encoding (#796) * [Core] Update throughput benchmark to batch encoding Signed-off-by: 400Ping <[email protected]> * fix conflict Signed-off-by: 400Ping <[email protected]> --------- Signed-off-by: 400Ping <[email protected]> --- qdp/qdp-core/examples/dataloader_throughput.rs | 51 ++++++++++++------------ qdp/qdp-python/benchmark/benchmark_throughput.py | 11 +++-- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/qdp/qdp-core/examples/dataloader_throughput.rs b/qdp/qdp-core/examples/dataloader_throughput.rs index 029caaa1f..d3cb1ea82 100644 --- a/qdp/qdp-core/examples/dataloader_throughput.rs +++ b/qdp/qdp-core/examples/dataloader_throughput.rs @@ -29,17 +29,16 @@ const BATCH_SIZE: usize = 64; const VECTOR_LEN: usize = 1024; // 2^10 const NUM_QUBITS: usize = 10; -fn build_sample(seed: u64) -> Vec<f64> { +fn fill_sample(seed: u64, out: &mut [f64]) { // Lightweight deterministic pattern to keep CPU generation cheap + debug_assert_eq!(out.len(), VECTOR_LEN); let mask = (VECTOR_LEN - 1) as u64; // power-of-two mask instead of modulo let scale = 1.0 / VECTOR_LEN as f64; - let mut out = Vec::with_capacity(VECTOR_LEN); - for i in 0..VECTOR_LEN { + for (i, value) in out.iter_mut().enumerate() { let mixed = (i as u64 + seed) & mask; - out.push(mixed as f64 * scale); + *value = mixed as f64 * scale; } - out } fn main() { @@ -77,10 +76,14 @@ fn main() { let producer = thread::spawn(move || { for batch_idx in 0..total_batches { - let mut batch = Vec::with_capacity(BATCH_SIZE); + let mut batch = vec![0.0f64; BATCH_SIZE * VECTOR_LEN]; let seed_base = (batch_idx * BATCH_SIZE) as u64; for i in 0..BATCH_SIZE { - batch.push(build_sample(seed_base + i as u64)); + let offset = i * VECTOR_LEN; + fill_sample( + seed_base + i as u64, + &mut batch[offset..offset + VECTOR_LEN], + ); } if tx.send(batch).is_err() { break; @@ -93,29 +96,25 @@ fn main() { let start = Instant::now(); for (batch_idx, batch) in rx.iter().enumerate() { - // NOTE: The DataLoader produces host-side batches of size BATCH_SIZE, - // but we currently submit each sample to the GPU one-by-one. - // From the GPU's perspective this is effectively "batch size = 1" - // per encode call; batching is only happening on the host side. - for sample in batch { - match engine.encode(&sample, NUM_QUBITS, "amplitude") { - Ok(ptr) => unsafe { - let managed = &mut *ptr; - if let Some(deleter) = managed.deleter.take() { - deleter(ptr); - } - }, - Err(e) => { - eprintln!( - "Encode failed on batch {} (vector {}): {:?}", - batch_idx, total_vectors, e - ); - return; + debug_assert_eq!(batch.len() % VECTOR_LEN, 0); + let num_samples = batch.len() / VECTOR_LEN; + match engine.encode_batch(&batch, num_samples, VECTOR_LEN, NUM_QUBITS, "amplitude") { + Ok(ptr) => unsafe { + let managed = &mut *ptr; + if let Some(deleter) = managed.deleter.take() { + deleter(ptr); } + }, + Err(e) => { + eprintln!( + "Encode batch failed on batch {} (processed {} vectors): {:?}", + batch_idx, total_vectors, e + ); + return; } } - total_vectors += BATCH_SIZE; + total_vectors += num_samples; if last_report.elapsed() >= report_interval { let elapsed = start.elapsed().as_secs_f64().max(1e-6); diff --git a/qdp/qdp-python/benchmark/benchmark_throughput.py b/qdp/qdp-python/benchmark/benchmark_throughput.py index 0d7916fec..b37b7db59 100644 --- a/qdp/qdp-python/benchmark/benchmark_throughput.py +++ b/qdp/qdp-python/benchmark/benchmark_throughput.py @@ -126,12 +126,11 @@ def run_mahout(num_qubits: int, total_batches: int, batch_size: int, prefetch: i for batch in prefetched_batches( total_batches, batch_size, 1 << num_qubits, prefetch ): - normalized = normalize_batch(batch) - for sample in normalized: - qtensor = engine.encode(sample.tolist(), num_qubits, "amplitude") - tensor = torch.utils.dlpack.from_dlpack(qtensor).abs().to(torch.float32) - _ = tensor.sum() - processed += 1 + normalized = np.ascontiguousarray(normalize_batch(batch), dtype=np.float64) + qtensor = engine.encode_batch(normalized, num_qubits, "amplitude") + tensor = torch.utils.dlpack.from_dlpack(qtensor).abs().to(torch.float32) + _ = tensor.sum() + processed += normalized.shape[0] torch.cuda.synchronize() duration = time.perf_counter() - start
