This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git


The following commit(s) were added to refs/heads/main by this push:
     new 769f08be7 [QDP] Add a Quantum Data Loader and API refactor (#1000)
769f08be7 is described below

commit 769f08be788612bbe3f22064417b3cb8e97023c3
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Mon Feb 2 12:24:24 2026 +0800

    [QDP] Add a Quantum Data Loader and API refactor (#1000)
    
    * Add a Quantum Data Loader and API refactor
    
    * fix error
    
    * fix routes problem and update
    
    * update
    
    * improve according to comments
---
 docs/qdp/python-api.md                             | 200 +++++++++++++++
 qdp/qdp-core/src/gpu/encodings/amplitude.rs        |  25 +-
 qdp/qdp-core/src/lib.rs                            |  51 ++++
 qdp/qdp-core/src/pipeline_runner.rs                | 248 +++++++++++++++++++
 qdp/qdp-python/benchmark/__init__.py               |  17 ++
 qdp/qdp-python/benchmark/api.py                    |  23 ++
 qdp/qdp-python/benchmark/benchmark_latency.py      |  41 ++--
 .../benchmark/benchmark_loader_throughput.py       | 106 ++++++++
 qdp/qdp-python/benchmark/benchmark_throughput.py   |  46 ++--
 qdp/qdp-python/benchmark/loader.py                 |  23 ++
 qdp/qdp-python/benchmark/run_pipeline_baseline.py  |  60 +++--
 qdp/qdp-python/pyproject.toml                      |  11 +
 qdp/qdp-python/qumat_qdp/__init__.py               |  56 +++++
 qdp/qdp-python/qumat_qdp/api.py                    | 156 ++++++++++++
 qdp/qdp-python/qumat_qdp/loader.py                 | 202 ++++++++++++++++
 qdp/qdp-python/src/lib.rs                          | 269 ++++++++++++++++++++-
 testing/qdp/test_benchmark_api.py                  | 115 +++++++++
 testing/qdp/test_bindings.py                       |   9 +-
 18 files changed, 1575 insertions(+), 83 deletions(-)

diff --git a/docs/qdp/python-api.md b/docs/qdp/python-api.md
new file mode 100644
index 000000000..707969e81
--- /dev/null
+++ b/docs/qdp/python-api.md
@@ -0,0 +1,200 @@
+# QDP Python API (qumat_qdp)
+
+Public Python API for QDP: GPU-accelerated encoding, benchmark helpers, and a 
batch iterator for training or evaluation loops.
+
+## Overview
+
+The **qumat_qdp** package wraps the native extension `_qdp` and adds:
+
+- **Encoding:** `QdpEngine` and `QuantumTensor` for encoding classical data 
into quantum states and zero-copy DLPack integration.
+- **Benchmark:** `QdpBenchmark` for throughput/latency runs (full pipeline in 
Rust, GIL released).
+- **Data loader:** `QuantumDataLoader` for iterating encoded batches one at a 
time (`for qt in loader:`).
+
+Import from the package:
+
+```python
+from qumat_qdp import (
+    QdpEngine,
+    QuantumTensor,
+    QdpBenchmark,
+    ThroughputResult,
+    LatencyResult,
+    QuantumDataLoader,
+    run_throughput_pipeline_py,
+)
+```
+
+**Requirements:** Linux with NVIDIA GPU (CUDA). Loader and pipeline helpers 
are stubs on other platforms and raise `RuntimeError`.
+
+---
+
+## Encoding API
+
+### QdpEngine
+
+GPU encoder. Constructor and main methods:
+
+**`QdpEngine(device_id=0, precision="float32")`**
+
+- `device_id` (int): CUDA device ID.
+- `precision` (str): `"float32"` or `"float64"`.
+- Raises `RuntimeError` on init failure or unsupported precision.
+
+**`encode(data, num_qubits, encoding_method="amplitude") -> QuantumTensor`**
+
+- `data`: list of floats, 1D/2D NumPy array (float64, C-contiguous), PyTorch 
tensor (CPU/CUDA), or file path (`.parquet`, `.arrow`, `.feather`, `.npy`, 
`.pt`, `.pth`, `.pb`).
+- `num_qubits` (int): Number of qubits.
+- `encoding_method` (str): `"amplitude"` | `"angle"` | `"basis"` | `"iqp"` | 
`"iqp-z"`.
+- Returns a DLPack-compatible tensor; use `torch.from_dlpack(qtensor)`. Shape 
`[batch_size, 2^num_qubits]`.
+
+**`create_synthetic_loader(total_batches, batch_size=64, num_qubits=16, 
encoding_method="amplitude", seed=None)`**
+
+- Returns an iterator that yields one `QuantumTensor` per batch. GIL is 
released during each encode. Linux/CUDA only.
+
+### QuantumTensor
+
+DLPack wrapper for a GPU quantum state.
+
+- **`__dlpack__(stream=None)`:** Returns a DLPack PyCapsule (single use).
+- **`__dlpack_device__()`:** Returns `(device_type, device_id)`; CUDA is `(2, 
gpu_id)`.
+
+If not consumed, memory is freed when the object is dropped; if consumed (e.g. 
by PyTorch), ownership transfers to the consumer.
+
+---
+
+## Benchmark API
+
+Runs the full encode pipeline in Rust (warmup + timed loop) with GIL released. 
No Python-side loop.
+
+### QdpBenchmark
+
+Builder; chain methods then call `run_throughput()` or `run_latency()`.
+
+**Constructor:** `QdpBenchmark(device_id=0)`
+
+**Chainable methods:**
+
+| Method | Description |
+|--------|-------------|
+| `qubits(n)` | Number of qubits. |
+| `encoding(method)` | `"amplitude"` \| `"angle"` \| `"basis"`. |
+| `batches(total, size=64)` | Total batches and batch size. |
+| `prefetch(n)` | No-op (API compatibility). |
+| `warmup(n)` | Warmup batch count. |
+
+**`run_throughput() -> ThroughputResult`**
+
+- Requires `qubits` and `batches` to be set.
+- Returns `ThroughputResult` with `duration_sec`, `vectors_per_sec`.
+- Raises `ValueError` if config missing; `RuntimeError` if pipeline 
unavailable.
+
+**`run_latency() -> LatencyResult`**
+
+- Same pipeline; returns `LatencyResult` with `duration_sec`, 
`latency_ms_per_vector`.
+
+### Result types
+
+| Type | Fields |
+|------|--------|
+| `ThroughputResult` | `duration_sec`, `vectors_per_sec` |
+| `LatencyResult` | `duration_sec`, `latency_ms_per_vector` |
+
+### Example
+
+```python
+from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult
+
+result = (
+    QdpBenchmark(device_id=0)
+    .qubits(16)
+    .encoding("amplitude")
+    .batches(100, size=64)
+    .warmup(2)
+    .run_throughput()
+)
+print(result.vectors_per_sec)
+
+lat = (
+    QdpBenchmark(device_id=0)
+    .qubits(16)
+    .encoding("amplitude")
+    .batches(100, size=64)
+    .run_latency()
+)
+print(lat.latency_ms_per_vector)
+```
+
+---
+
+## Data Loader API
+
+Iterate over encoded batches one at a time. Each batch is a `QuantumTensor`; 
encoding runs in Rust with GIL released per batch.
+
+### QuantumDataLoader
+
+Builder for a synthetic-data loader. Calling `iter(loader)` (or `for qt in 
loader`) creates the Rust-backed iterator.
+
+**Constructor:**
+`QuantumDataLoader(device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", seed=None)`
+
+**Chainable methods:**
+
+| Method | Description |
+|--------|-------------|
+| `qubits(n)` | Number of qubits. |
+| `encoding(method)` | `"amplitude"` \| `"angle"` \| `"basis"`. |
+| `batches(total, size=64)` | Total batches and batch size. |
+| `source_synthetic(total_batches=None)` | Synthetic data (default); optional 
override for total batches. |
+| `seed(s)` | RNG seed for reproducibility. |
+
+**Iteration:** `for qt in loader:` yields `QuantumTensor` of shape 
`[batch_size, 2^num_qubits]`. Consume once per tensor, e.g. 
`torch.from_dlpack(qt)`.
+
+### Example
+
+```python
+from qumat_qdp import QuantumDataLoader
+import torch
+
+loader = (
+    QuantumDataLoader(device_id=0)
+    .qubits(16)
+    .encoding("amplitude")
+    .batches(100, size=64)
+    .source_synthetic()
+)
+
+for qt in loader:
+    batch = torch.from_dlpack(qt)  # [batch_size, 2^16]
+    # use batch ...
+```
+
+---
+
+## Low-level: run_throughput_pipeline_py
+
+Runs the full pipeline in Rust with GIL released. Used by `QdpBenchmark`; can 
be called directly.
+
+**Signature:**
+`run_throughput_pipeline_py(device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None) -> 
tuple[float, float, float]`
+
+**Returns:** `(duration_sec, vectors_per_sec, latency_ms_per_vector)`.
+
+**Raises:** `RuntimeError` on failure or when not available (e.g. non-Linux).
+
+---
+
+## Backward compatibility
+
+`benchmark/api.py` and `benchmark/loader.py` re-export from `qumat_qdp`. 
Prefer:
+
+- `from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult`
+- `from qumat_qdp import QuantumDataLoader`
+
+Benchmark scripts add the project root to `sys.path`, so from the `qdp-python` 
directory you can run:
+
+```bash
+uv run python benchmark/run_pipeline_baseline.py
+uv run python benchmark/benchmark_loader_throughput.py
+```
+
+without setting `PYTHONPATH`.
diff --git a/qdp/qdp-core/src/gpu/encodings/amplitude.rs 
b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
index 032418c46..037b3bd31 100644
--- a/qdp/qdp-core/src/gpu/encodings/amplitude.rs
+++ b/qdp/qdp-core/src/gpu/encodings/amplitude.rs
@@ -316,7 +316,7 @@ impl AmplitudeEncoder {
     /// streaming mechanics, while this method focuses on the amplitude
     /// encoding kernel logic.
     #[cfg(target_os = "linux")]
-    fn encode_async_pipeline(
+    pub(crate) fn encode_async_pipeline(
         device: &Arc<CudaDevice>,
         host_data: &[f64],
         _num_qubits: usize,
@@ -548,4 +548,27 @@ impl AmplitudeEncoder {
 
         Ok(inv_norm)
     }
+
+    /// Run dual-stream pipeline for amplitude encoding (exposed for Python / 
benchmark).
+    #[cfg(target_os = "linux")]
+    pub(crate) fn run_amplitude_dual_stream_pipeline(
+        device: &Arc<CudaDevice>,
+        host_data: &[f64],
+        num_qubits: usize,
+    ) -> Result<()> {
+        Preprocessor::validate_input(host_data, num_qubits)?;
+        let state_len = 1 << num_qubits;
+        let state_vector = GpuStateVector::new(device, num_qubits, 
Precision::Float64)?;
+        let norm = Preprocessor::calculate_l2_norm(host_data)?;
+        let inv_norm = 1.0 / norm;
+        Self::encode_async_pipeline(
+            device,
+            host_data,
+            num_qubits,
+            state_len,
+            inv_norm,
+            &state_vector,
+        )?;
+        Ok(())
+    }
 }
diff --git a/qdp/qdp-core/src/lib.rs b/qdp/qdp-core/src/lib.rs
index 6e50414a9..1fe172d1b 100644
--- a/qdp/qdp-core/src/lib.rs
+++ b/qdp/qdp-core/src/lib.rs
@@ -35,6 +35,16 @@ mod profiling;
 pub use error::{MahoutError, Result, cuda_error_to_string};
 pub use gpu::memory::Precision;
 
+// Throughput/latency pipeline runner: single path using QdpEngine and 
encode_batch in Rust.
+#[cfg(target_os = "linux")]
+mod pipeline_runner;
+
+#[cfg(target_os = "linux")]
+pub use pipeline_runner::{
+    DataSource, PipelineConfig, PipelineIterator, PipelineRunResult, 
run_latency_pipeline,
+    run_throughput_pipeline,
+};
+
 use std::sync::Arc;
 
 use crate::dlpack::DLManagedTensor;
@@ -45,6 +55,7 @@ use cudarc::driver::CudaDevice;
 ///
 /// Manages GPU context and dispatches encoding tasks.
 /// Provides unified interface for device management, memory allocation, and 
DLPack.
+#[derive(Clone)]
 pub struct QdpEngine {
     device: Arc<CudaDevice>,
     precision: Precision,
@@ -110,6 +121,15 @@ impl QdpEngine {
         &self.device
     }
 
+    /// Block until all GPU work on the default stream has completed.
+    /// Used by the generic pipeline and other callers that need to sync 
before timing.
+    #[cfg(target_os = "linux")]
+    pub fn synchronize(&self) -> Result<()> {
+        self.device
+            .synchronize()
+            .map_err(|e| MahoutError::Cuda(format!("CUDA device synchronize 
failed: {:?}", e)))
+    }
+
     /// Encode multiple samples in a single fused kernel (most efficient)
     ///
     /// Allocates one large GPU buffer and launches a single batch kernel.
@@ -148,6 +168,37 @@ impl QdpEngine {
         Ok(dlpack_ptr)
     }
 
+    /// Run dual-stream pipeline for encoding (H2D + kernel overlap). Exposes 
gpu::pipeline::run_dual_stream_pipeline.
+    /// Currently supports amplitude encoding (1D host_data). Does not return 
a tensor;
+    /// use for throughput measurement or when the encoded state is not needed.
+    ///
+    /// # Arguments
+    /// * `host_data` - 1D input data (e.g. single sample for amplitude)
+    /// * `num_qubits` - Number of qubits
+    /// * `encoding_method` - Strategy (currently only "amplitude" supported 
for this path)
+    #[cfg(target_os = "linux")]
+    pub fn run_dual_stream_encode(
+        &self,
+        host_data: &[f64],
+        num_qubits: usize,
+        encoding_method: &str,
+    ) -> Result<()> {
+        crate::profile_scope!("Mahout::RunDualStreamEncode");
+        match encoding_method.to_lowercase().as_str() {
+            "amplitude" => {
+                
gpu::encodings::amplitude::AmplitudeEncoder::run_amplitude_dual_stream_pipeline(
+                    &self.device,
+                    host_data,
+                    num_qubits,
+                )
+            }
+            _ => Err(MahoutError::InvalidInput(format!(
+                "run_dual_stream_encode supports only 'amplitude' for now, got 
'{}'",
+                encoding_method
+            ))),
+        }
+    }
+
     /// Streaming Parquet encoder with multi-threaded IO
     ///
     /// Uses Producer-Consumer pattern: IO thread reads Parquet while GPU 
processes data.
diff --git a/qdp/qdp-core/src/pipeline_runner.rs 
b/qdp/qdp-core/src/pipeline_runner.rs
new file mode 100644
index 000000000..9952e26ad
--- /dev/null
+++ b/qdp/qdp-core/src/pipeline_runner.rs
@@ -0,0 +1,248 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Throughput/latency pipeline using QdpEngine and encode_batch. Full loop 
runs in Rust;
+// Python bindings release GIL during the run.
+
+use std::f64::consts::PI;
+use std::time::Instant;
+
+use crate::QdpEngine;
+use crate::dlpack::DLManagedTensor;
+use crate::error::Result;
+
+/// Configuration for throughput/latency pipeline runs (Python 
run_throughput_pipeline_py).
+#[derive(Clone, Debug)]
+pub struct PipelineConfig {
+    pub device_id: usize,
+    pub num_qubits: u32,
+    pub batch_size: usize,
+    pub total_batches: usize,
+    pub encoding_method: String,
+    pub seed: Option<u64>,
+    pub warmup_batches: usize,
+}
+
+impl Default for PipelineConfig {
+    fn default() -> Self {
+        Self {
+            device_id: 0,
+            num_qubits: 16,
+            batch_size: 64,
+            total_batches: 100,
+            encoding_method: "amplitude".to_string(),
+            seed: None,
+            warmup_batches: 0,
+        }
+    }
+}
+
+/// Result of a throughput or latency pipeline run.
+#[derive(Clone, Debug)]
+pub struct PipelineRunResult {
+    pub duration_sec: f64,
+    pub vectors_per_sec: f64,
+    pub latency_ms_per_vector: f64,
+}
+
+/// Data source for the pipeline iterator (Phase 1: Synthetic only; Phase 2: 
File).
+#[derive(Debug)]
+pub enum DataSource {
+    Synthetic {
+        seed: u64,
+        batch_index: usize,
+        total_batches: usize,
+    },
+}
+
+/// Stateful iterator that yields one batch DLPack at a time for Python `for` 
loop consumption.
+/// Holds a clone of QdpEngine, PipelineConfig, and source state; reuses 
generate_batch and encode_batch.
+pub struct PipelineIterator {
+    engine: QdpEngine,
+    config: PipelineConfig,
+    source: DataSource,
+    vector_len: usize,
+}
+
+impl PipelineIterator {
+    /// Create a new synthetic-data pipeline iterator.
+    pub fn new_synthetic(engine: QdpEngine, config: PipelineConfig) -> 
Result<Self> {
+        let vector_len = vector_len(config.num_qubits, 
&config.encoding_method);
+        let source = DataSource::Synthetic {
+            seed: config.seed.unwrap_or(0),
+            batch_index: 0,
+            total_batches: config.total_batches,
+        };
+        Ok(Self {
+            engine,
+            config,
+            source,
+            vector_len,
+        })
+    }
+
+    /// Returns the next batch as a DLPack pointer; `Ok(None)` when exhausted.
+    pub fn next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>> {
+        let (batch_data, num_qubits) = match &mut self.source {
+            DataSource::Synthetic {
+                batch_index,
+                total_batches,
+                ..
+            } => {
+                if *batch_index >= *total_batches {
+                    return Ok(None);
+                }
+                let data = generate_batch(&self.config, *batch_index, 
self.vector_len);
+                *batch_index += 1;
+                (data, self.config.num_qubits as usize)
+            }
+        };
+        let ptr = self.engine.encode_batch(
+            &batch_data,
+            self.config.batch_size,
+            self.vector_len,
+            num_qubits,
+            &self.config.encoding_method,
+        )?;
+        Ok(Some(ptr))
+    }
+}
+
+/// Vector length per sample for given encoding (used by pipeline and 
iterator).
+pub fn vector_len(num_qubits: u32, encoding_method: &str) -> usize {
+    let n = num_qubits as usize;
+    match encoding_method.to_lowercase().as_str() {
+        "angle" => n,
+        "basis" => 1,
+        _ => 1 << n, // amplitude
+    }
+}
+
+/// Deterministic sample generation matching Python utils.build_sample 
(amplitude/angle/basis).
+fn fill_sample(seed: u64, out: &mut [f64], encoding_method: &str) -> 
Result<()> {
+    let len = out.len();
+    if len == 0 {
+        return Ok(());
+    }
+    match encoding_method.to_lowercase().as_str() {
+        "basis" => {
+            let mask = len.saturating_sub(1) as u64;
+            let idx = seed & mask;
+            out[0] = idx as f64;
+        }
+        "angle" => {
+            let scale = (2.0 * PI) / len as f64;
+            for (i, v) in out.iter_mut().enumerate() {
+                let mixed = (i as u64 + seed) % (len as u64);
+                *v = mixed as f64 * scale;
+            }
+        }
+        _ => {
+            // amplitude
+            let mask = (len - 1) as u64;
+            let scale = 1.0 / len as f64;
+            for (i, v) in out.iter_mut().enumerate() {
+                let mixed = (i as u64 + seed) & mask;
+                *v = mixed as f64 * scale;
+            }
+        }
+    }
+    Ok(())
+}
+
+/// Generate one batch (batch_size * vector_len elements, or batch_size * 1 
for basis).
+fn generate_batch(config: &PipelineConfig, batch_idx: usize, vector_len: 
usize) -> Vec<f64> {
+    let seed_base = config
+        .seed
+        .unwrap_or(0)
+        .wrapping_add((batch_idx * config.batch_size) as u64);
+    let mut batch = vec![0.0f64; config.batch_size * vector_len];
+    for i in 0..config.batch_size {
+        let offset = i * vector_len;
+        let _ = fill_sample(
+            seed_base + i as u64,
+            &mut batch[offset..offset + vector_len],
+            &config.encoding_method,
+        );
+    }
+    batch
+}
+
+/// Release DLPack tensor (call deleter so GPU memory is freed).
+unsafe fn release_dlpack(ptr: *mut DLManagedTensor) {
+    if ptr.is_null() {
+        return;
+    }
+    let managed = unsafe { &mut *ptr };
+    if let Some(deleter) = managed.deleter.take() {
+        unsafe { deleter(ptr) };
+    }
+}
+
+/// Run throughput pipeline: warmup, then timed encode_batch loop; returns 
stats.
+pub fn run_throughput_pipeline(config: &PipelineConfig) -> 
Result<PipelineRunResult> {
+    let engine = QdpEngine::new(config.device_id)?;
+    let vector_len = vector_len(config.num_qubits, &config.encoding_method);
+    let num_qubits = config.num_qubits as usize;
+
+    // Warmup
+    for b in 0..config.warmup_batches {
+        let batch = generate_batch(config, b, vector_len);
+        let ptr = engine.encode_batch(
+            &batch,
+            config.batch_size,
+            vector_len,
+            num_qubits,
+            &config.encoding_method,
+        )?;
+        unsafe { release_dlpack(ptr) };
+    }
+
+    #[cfg(target_os = "linux")]
+    engine.synchronize()?;
+
+    let start = Instant::now();
+    for b in 0..config.total_batches {
+        let batch = generate_batch(config, b, vector_len);
+        let ptr = engine.encode_batch(
+            &batch,
+            config.batch_size,
+            vector_len,
+            num_qubits,
+            &config.encoding_method,
+        )?;
+        unsafe { release_dlpack(ptr) };
+    }
+
+    #[cfg(target_os = "linux")]
+    engine.synchronize()?;
+
+    let duration_sec = start.elapsed().as_secs_f64().max(1e-9);
+    let total_vectors = config.total_batches * config.batch_size;
+    let vectors_per_sec = total_vectors as f64 / duration_sec;
+    let latency_ms_per_vector = (duration_sec / total_vectors as f64) * 1000.0;
+
+    Ok(PipelineRunResult {
+        duration_sec,
+        vectors_per_sec,
+        latency_ms_per_vector,
+    })
+}
+
+/// Run latency pipeline (same as throughput; returns same stats; name for API 
parity).
+pub fn run_latency_pipeline(config: &PipelineConfig) -> 
Result<PipelineRunResult> {
+    run_throughput_pipeline(config)
+}
diff --git a/qdp/qdp-python/benchmark/__init__.py 
b/qdp/qdp-python/benchmark/__init__.py
new file mode 100644
index 000000000..1ba132fde
--- /dev/null
+++ b/qdp/qdp-python/benchmark/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Benchmark scripts and utilities for QDP Python.
diff --git a/qdp/qdp-python/benchmark/api.py b/qdp/qdp-python/benchmark/api.py
new file mode 100644
index 000000000..b2cbda019
--- /dev/null
+++ b/qdp/qdp-python/benchmark/api.py
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Re-export benchmark API from qumat_qdp. Prefer: from qumat_qdp import 
QdpBenchmark, ThroughputResult, LatencyResult."""
+
+from __future__ import annotations
+
+from qumat_qdp import LatencyResult, QdpBenchmark, ThroughputResult
+
+__all__ = ["LatencyResult", "QdpBenchmark", "ThroughputResult"]
diff --git a/qdp/qdp-python/benchmark/benchmark_latency.py 
b/qdp/qdp-python/benchmark/benchmark_latency.py
index 6e692b63b..c22ad774e 100644
--- a/qdp/qdp-python/benchmark/benchmark_latency.py
+++ b/qdp/qdp-python/benchmark/benchmark_latency.py
@@ -18,8 +18,8 @@
 """
 Data-to-State latency benchmark: CPU RAM -> GPU VRAM.
 
-Run:
-    python qdp/qdp-python/benchmark/benchmark_latency.py --qubits 16 \
+Run from qdp-python directory (qumat_qdp must be importable, e.g. via uv):
+    uv run python benchmark/benchmark_latency.py --qubits 16 \\
         --batches 200 --batch-size 64 --prefetch 16
 """
 
@@ -30,8 +30,8 @@ import time
 
 import torch
 
-from _qdp import QdpEngine
-from utils import normalize_batch, prefetched_batches
+from benchmark.utils import normalize_batch, prefetched_batches
+from qumat_qdp import QdpBenchmark
 
 BAR = "=" * 70
 SEP = "-" * 70
@@ -92,30 +92,25 @@ def run_mahout(
     prefetch: int,
     encoding_method: str = "amplitude",
 ):
+    """Run Mahout latency using the generic user API (QdpBenchmark)."""
     try:
-        engine = QdpEngine(0)
+        result = (
+            QdpBenchmark(device_id=0)
+            .qubits(num_qubits)
+            .encoding(encoding_method)
+            .batches(total_batches, size=batch_size)
+            .prefetch(prefetch)
+            .run_latency()
+        )
     except Exception as exc:
         print(f"[Mahout] Init failed: {exc}")
         return 0.0, 0.0
 
-    vector_len = num_qubits if encoding_method == "angle" else (1 << 
num_qubits)
-    sync_cuda()
-    start = time.perf_counter()
-    processed = 0
-
-    for batch in prefetched_batches(
-        total_batches, batch_size, vector_len, prefetch, encoding_method
-    ):
-        normalized = normalize_batch(batch, encoding_method)
-        qtensor = engine.encode(normalized, num_qubits, encoding_method)
-        _ = torch.utils.dlpack.from_dlpack(qtensor)
-        processed += normalized.shape[0]
-
-    sync_cuda()
-    duration = time.perf_counter() - start
-    latency_ms = (duration / processed) * 1000 if processed > 0 else 0.0
-    print(f"  Total Time: {duration:.4f} s ({latency_ms:.3f} ms/vector)")
-    return duration, latency_ms
+    print(
+        f"  Total Time: {result.duration_sec:.4f} s "
+        f"({result.latency_ms_per_vector:.3f} ms/vector)"
+    )
+    return result.duration_sec, result.latency_ms_per_vector
 
 
 def run_pennylane(num_qubits: int, total_batches: int, batch_size: int, 
prefetch: int):
diff --git a/qdp/qdp-python/benchmark/benchmark_loader_throughput.py 
b/qdp/qdp-python/benchmark/benchmark_loader_throughput.py
new file mode 100644
index 000000000..223e68555
--- /dev/null
+++ b/qdp/qdp-python/benchmark/benchmark_loader_throughput.py
@@ -0,0 +1,106 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Throughput benchmark using QuantumDataLoader (for qt in loader).
+
+Compares iterator-based throughput with run_throughput_pipeline_py.
+Expectation: loader version slightly slower due to Python boundary per batch.
+
+Run from qdp-python directory (qumat_qdp must be importable, e.g. via uv):
+  uv run python benchmark/benchmark_loader_throughput.py --qubits 16 --batches 
200 --batch-size 64
+"""
+
+from __future__ import annotations
+
+import argparse
+import time
+
+from qumat_qdp import QuantumDataLoader, QdpBenchmark
+
+
+def run_loader_throughput(
+    num_qubits: int,
+    total_batches: int,
+    batch_size: int,
+    encoding_method: str = "amplitude",
+) -> tuple[float, float]:
+    """Run throughput by iterating QuantumDataLoader; returns (duration_sec, 
vectors_per_sec)."""
+    loader = (
+        QuantumDataLoader(device_id=0)
+        .qubits(num_qubits)
+        .encoding(encoding_method)
+        .batches(total_batches, size=batch_size)
+        .source_synthetic()
+    )
+    total_vectors = total_batches * batch_size
+    start = time.perf_counter()
+    count = 0
+    for qt in loader:
+        count += 1
+        # Consumer: touch tensor (e.g. could torch.from_dlpack(qt) and use it)
+        _ = qt
+    elapsed = time.perf_counter() - start
+    if count != total_batches:
+        raise RuntimeError(f"Expected {total_batches} batches, got {count}")
+    duration_sec = max(elapsed, 1e-9)
+    vectors_per_sec = total_vectors / duration_sec
+    return duration_sec, vectors_per_sec
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser(
+        description="QuantumDataLoader throughput benchmark"
+    )
+    parser.add_argument("--qubits", type=int, default=16)
+    parser.add_argument("--batches", type=int, default=200)
+    parser.add_argument("--batch-size", type=int, default=64)
+    parser.add_argument("--encoding", type=str, default="amplitude")
+    parser.add_argument("--trials", type=int, default=3)
+    args = parser.parse_args()
+
+    print("QuantumDataLoader throughput (for qt in loader)")
+    print(
+        f"  qubits={args.qubits}, batches={args.batches}, 
batch_size={args.batch_size}"
+    )
+
+    loader_times: list[float] = []
+    for t in range(args.trials):
+        dur, vps = run_loader_throughput(
+            args.qubits, args.batches, args.batch_size, args.encoding
+        )
+        loader_times.append(vps)
+        print(f"  Trial {t + 1}: {dur:.4f} s, {vps:.1f} vec/s")
+
+    median_vps = sorted(loader_times)[len(loader_times) // 2]
+    print(f"  Median: {median_vps:.1f} vec/s")
+
+    # Compare with full Rust pipeline (single boundary)
+    print("\nQdpBenchmark.run_throughput() (full Rust pipeline, single 
boundary):")
+    result = (
+        QdpBenchmark(device_id=0)
+        .qubits(args.qubits)
+        .encoding(args.encoding)
+        .batches(args.batches, size=args.batch_size)
+        .run_throughput()
+    )
+    print(f"  {result.duration_sec:.4f} s, {result.vectors_per_sec:.1f} vec/s")
+    print(f"\nLoader vs pipeline ratio: {median_vps / 
result.vectors_per_sec:.3f}")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/qdp/qdp-python/benchmark/benchmark_throughput.py 
b/qdp/qdp-python/benchmark/benchmark_throughput.py
index 15c67646f..acb7e067d 100644
--- a/qdp/qdp-python/benchmark/benchmark_throughput.py
+++ b/qdp/qdp-python/benchmark/benchmark_throughput.py
@@ -23,8 +23,8 @@ The workload mirrors the 
`qdp-core/examples/dataloader_throughput.rs` pipeline:
 - Prefetch on the CPU side to keep the GPU fed.
 - Encode vectors into amplitude states on GPU and run a tiny consumer op.
 
-Run:
-    python qdp/benchmark/benchmark_throughput.py --qubits 16 --batches 200 
--batch-size 64
+Run from qdp-python directory (qumat_qdp must be importable, e.g. via uv):
+    uv run python benchmark/benchmark_throughput.py --qubits 16 --batches 200 
--batch-size 64
 """
 
 import argparse
@@ -33,8 +33,8 @@ import time
 import numpy as np
 import torch
 
-from _qdp import QdpEngine
-from utils import normalize_batch, prefetched_batches
+from benchmark.utils import normalize_batch, prefetched_batches
+from qumat_qdp import QdpBenchmark
 
 BAR = "=" * 70
 SEP = "-" * 70
@@ -83,34 +83,26 @@ def run_mahout(
     prefetch: int,
     encoding_method: str = "amplitude",
 ):
+    """Run Mahout throughput using the generic user API (QdpBenchmark)."""
     try:
-        engine = QdpEngine(0)
+        result = (
+            QdpBenchmark(device_id=0)
+            .qubits(num_qubits)
+            .encoding(encoding_method)
+            .batches(total_batches, size=batch_size)
+            .prefetch(prefetch)
+            .run_throughput()
+        )
     except Exception as exc:
         print(f"[Mahout] Init failed: {exc}")
         return 0.0, 0.0
 
-    torch.cuda.synchronize()
-    start = time.perf_counter()
-
-    vector_len = num_qubits if encoding_method == "angle" else (1 << 
num_qubits)
-    processed = 0
-    for batch in prefetched_batches(
-        total_batches, batch_size, vector_len, prefetch, encoding_method
-    ):
-        normalized = np.ascontiguousarray(
-            normalize_batch(batch, encoding_method), dtype=np.float64
-        )
-        qtensor = engine.encode(normalized, num_qubits, encoding_method)
-        tensor = torch.from_dlpack(qtensor).abs().to(torch.float32)
-        _ = tensor.sum()
-        processed += normalized.shape[0]
-
-    torch.cuda.synchronize()
-    duration = time.perf_counter() - start
-    throughput = processed / duration if duration > 0 else 0.0
-    print(f"  IO + Encode Time: {duration:.4f} s")
-    print(f"  Total Time: {duration:.4f} s ({throughput:.1f} vectors/sec)")
-    return duration, throughput
+    print(f"  IO + Encode Time: {result.duration_sec:.4f} s")
+    print(
+        f"  Total Time: {result.duration_sec:.4f} s "
+        f"({result.vectors_per_sec:.1f} vectors/sec)"
+    )
+    return result.duration_sec, result.vectors_per_sec
 
 
 def run_pennylane(num_qubits: int, total_batches: int, batch_size: int, 
prefetch: int):
diff --git a/qdp/qdp-python/benchmark/loader.py 
b/qdp/qdp-python/benchmark/loader.py
new file mode 100644
index 000000000..0b6988eed
--- /dev/null
+++ b/qdp/qdp-python/benchmark/loader.py
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Re-export QuantumDataLoader from qumat_qdp. Prefer: from qumat_qdp import 
QuantumDataLoader."""
+
+from __future__ import annotations
+
+from qumat_qdp import QuantumDataLoader
+
+__all__ = ["QuantumDataLoader"]
diff --git a/qdp/qdp-python/benchmark/run_pipeline_baseline.py 
b/qdp/qdp-python/benchmark/run_pipeline_baseline.py
index c813c8333..e6466c901 100644
--- a/qdp/qdp-python/benchmark/run_pipeline_baseline.py
+++ b/qdp/qdp-python/benchmark/run_pipeline_baseline.py
@@ -22,14 +22,26 @@ Runs throughput and latency benchmarks multiple times 
(default 5), computes
 median/p95, gathers system metadata, and writes CSV + markdown report to
 qdp/docs/optimization/results/.
 
+Uses the Rust-optimized pipeline only (qumat_qdp.QdpBenchmark -> 
_qdp.run_throughput_pipeline_py).
+No Python for loop; all scheduling is in Rust.
+
 Set observability before running (recommended):
   export QDP_ENABLE_POOL_METRICS=1
   export QDP_ENABLE_OVERLAP_TRACKING=1
   export RUST_LOG=info
 
-Usage:
-  cd qdp/qdp-python/benchmark
-  uv run python run_pipeline_baseline.py --qubits 16 --batch-size 64 
--prefetch 16 --batches 500 --trials 20
+Usage (from qdp-python):
+  cd qdp/qdp-python
+  uv run python benchmark/run_pipeline_baseline.py --qubits 16 --batch-size 64 
--batches 500 --trials 20
+
+If you see "run_throughput_pipeline_py is missing", uv is using a cached 
wheel. Force a rebuild:
+  uv sync --refresh-package qumat-qdp
+  uv run python benchmark/run_pipeline_baseline.py ...
+
+Alternatively build and run without uv run:
+  maturin develop
+  .venv/bin/python benchmark/run_pipeline_baseline.py ...
+  # or: ./benchmark/run_baseline.sh ...
 """
 
 from __future__ import annotations
@@ -48,12 +60,16 @@ os.environ.setdefault("QDP_ENABLE_POOL_METRICS", "1")
 os.environ.setdefault("QDP_ENABLE_OVERLAP_TRACKING", "1")
 os.environ.setdefault("RUST_LOG", "info")
 
-from benchmark_latency import run_mahout as run_mahout_latency
-from benchmark_throughput import run_mahout as run_mahout_throughput
+# Add project root to path so qumat_qdp is importable when run as script
+_benchmark_dir = Path(__file__).resolve().parent
+_project_root = _benchmark_dir.parent
+if str(_project_root) not in sys.path:
+    sys.path.insert(0, str(_project_root))
+
+from qumat_qdp import QdpBenchmark  # noqa: E402
 
 
 def _repo_root() -> Path:
-    # benchmark -> qdp-python -> qdp -> mahout (workspace root)
     return Path(__file__).resolve().parent.parent.parent.parent
 
 
@@ -116,13 +132,19 @@ def run_throughput_trials(
     trials: int,
     encoding: str,
 ) -> list[float]:
+    """Run throughput trials using the generic user API (QdpBenchmark)."""
     throughputs: list[float] = []
-    for i in range(trials):
-        _duration, throughput = run_mahout_throughput(
-            qubits, batches, batch_size, prefetch, encoding
+    for _ in range(trials):
+        result = (
+            QdpBenchmark(device_id=0)
+            .qubits(qubits)
+            .encoding(encoding)
+            .batches(batches, size=batch_size)
+            .prefetch(prefetch)
+            .run_throughput()
         )
-        if throughput > 0:
-            throughputs.append(throughput)
+        if result.vectors_per_sec > 0:
+            throughputs.append(result.vectors_per_sec)
     return throughputs
 
 
@@ -134,13 +156,19 @@ def run_latency_trials(
     trials: int,
     encoding: str,
 ) -> list[float]:
+    """Run latency trials using the generic user API (QdpBenchmark)."""
     latencies_ms: list[float] = []
-    for i in range(trials):
-        _duration, latency_ms = run_mahout_latency(
-            qubits, batches, batch_size, prefetch, encoding
+    for _ in range(trials):
+        result = (
+            QdpBenchmark(device_id=0)
+            .qubits(qubits)
+            .encoding(encoding)
+            .batches(batches, size=batch_size)
+            .prefetch(prefetch)
+            .run_latency()
         )
-        if latency_ms > 0:
-            latencies_ms.append(latency_ms)
+        if result.latency_ms_per_vector > 0:
+            latencies_ms.append(result.latency_ms_per_vector)
     return latencies_ms
 
 
diff --git a/qdp/qdp-python/pyproject.toml b/qdp/qdp-python/pyproject.toml
index 72f663a5a..f7fd659d6 100644
--- a/qdp/qdp-python/pyproject.toml
+++ b/qdp/qdp-python/pyproject.toml
@@ -38,5 +38,16 @@ name = "pytorch"
 url = "https://download.pytorch.org/whl/cu122";
 explicit = true
 
+# Invalidate uv cache when Rust or Cargo changes so extension is rebuilt 
(run_throughput_pipeline_py etc.).
+# Ref: https://docs.astral.sh/uv/concepts/cache/#dynamic-metadata
+[tool.uv]
+cache-keys = [
+    { file = "pyproject.toml" },
+    { file = "Cargo.toml" },
+    { file = "src/**" },
+]
+
 [tool.maturin]
 module-name = "_qdp"
+# Package at project root (qumat_qdp/) so editable install and uv run find it
+python-source = "."
diff --git a/qdp/qdp-python/qumat_qdp/__init__.py 
b/qdp/qdp-python/qumat_qdp/__init__.py
new file mode 100644
index 000000000..79d1cecdd
--- /dev/null
+++ b/qdp/qdp-python/qumat_qdp/__init__.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+QDP (Quantum Data Processing) Python API.
+
+Public API: QdpEngine, QuantumTensor (Rust extension _qdp),
+QdpBenchmark, ThroughputResult, LatencyResult (benchmark API),
+QuantumDataLoader (data loader iterator).
+
+Usage:
+    from qumat_qdp import QdpEngine, QuantumTensor
+    from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult
+    from qumat_qdp import QuantumDataLoader
+"""
+
+from __future__ import annotations
+
+# Rust extension (built by maturin). QdpEngine/QuantumTensor are public for
+# advanced use; QdpBenchmark and QuantumDataLoader are the recommended 
high-level API.
+import _qdp
+
+from qumat_qdp.api import (
+    LatencyResult,
+    QdpBenchmark,
+    ThroughputResult,
+)
+from qumat_qdp.loader import QuantumDataLoader
+
+# Re-export Rust extension types
+QdpEngine = _qdp.QdpEngine
+QuantumTensor = _qdp.QuantumTensor
+run_throughput_pipeline_py = getattr(_qdp, "run_throughput_pipeline_py", None)
+
+__all__ = [
+    "LatencyResult",
+    "QdpBenchmark",
+    "QdpEngine",
+    "QuantumDataLoader",
+    "QuantumTensor",
+    "ThroughputResult",
+    "run_throughput_pipeline_py",
+]
diff --git a/qdp/qdp-python/qumat_qdp/api.py b/qdp/qdp-python/qumat_qdp/api.py
new file mode 100644
index 000000000..2fffbbc09
--- /dev/null
+++ b/qdp/qdp-python/qumat_qdp/api.py
@@ -0,0 +1,156 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Benchmark API: Rust-optimized pipeline only (no Python for loop).
+
+Usage:
+    from qumat_qdp import QdpBenchmark, ThroughputResult, LatencyResult
+
+    result = (QdpBenchmark(device_id=0).qubits(16).encoding("amplitude")
+              .batches(100, size=64).warmup(2).run_throughput())
+    # result.duration_sec, result.vectors_per_sec
+
+    lat = (QdpBenchmark(device_id=0).qubits(16).encoding("amplitude")
+           .batches(100, size=64).run_latency())
+    # lat.latency_ms_per_vector
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class ThroughputResult:
+    """Result of run_throughput(): duration and vectors per second."""
+
+    duration_sec: float
+    vectors_per_sec: float
+
+
+@dataclass
+class LatencyResult:
+    """Result of run_latency(): duration and ms per vector."""
+
+    duration_sec: float
+    latency_ms_per_vector: float
+
+
+# Cached reference to Rust pipeline (avoids repeated import).
+_run_throughput_pipeline_py: Optional[object] = None
+
+
+def _get_run_throughput_pipeline_py():
+    """Return Rust run_throughput_pipeline_py; raise if not available."""
+    global _run_throughput_pipeline_py
+    if _run_throughput_pipeline_py is not None:
+        return _run_throughput_pipeline_py
+    import _qdp
+
+    fn = getattr(_qdp, "run_throughput_pipeline_py", None)
+    if fn is None:
+        raise RuntimeError(
+            "Rust pipeline not available: _qdp.run_throughput_pipeline_py is 
missing. "
+            "Force uv to rebuild the extension: from qdp-python run `uv sync 
--refresh-package qumat-qdp` "
+            "then `uv run python benchmark/run_pipeline_baseline.py`. Or run 
`maturin develop` and use "
+            "`.venv/bin/python` or `benchmark/run_baseline.sh`."
+        )
+    _run_throughput_pipeline_py = fn
+    return fn
+
+
+class QdpBenchmark:
+    """
+    Builder for throughput/latency benchmarks. Backend is Rust optimized 
pipeline only.
+
+    No Python for loop; run_throughput_pipeline_py runs the full pipeline in 
Rust
+    (single Python boundary, GIL released). Requires 
_qdp.run_throughput_pipeline_py
+    (Linux/CUDA build).
+    """
+
+    def __init__(self, device_id: int = 0):
+        self._device_id = device_id
+        self._num_qubits: Optional[int] = None
+        self._encoding_method: str = "amplitude"
+        self._total_batches: Optional[int] = None
+        self._batch_size: int = 64
+        self._warmup_batches: int = 0
+
+    def qubits(self, n: int) -> "QdpBenchmark":
+        self._num_qubits = n
+        return self
+
+    def encoding(self, method: str) -> "QdpBenchmark":
+        self._encoding_method = method
+        return self
+
+    def batches(self, total: int, size: int = 64) -> "QdpBenchmark":
+        self._total_batches = total
+        self._batch_size = size
+        return self
+
+    def prefetch(self, n: int) -> "QdpBenchmark":
+        """No-op for API compatibility; Rust pipeline does not use prefetch 
from Python."""
+        return self
+
+    def warmup(self, n: int) -> "QdpBenchmark":
+        self._warmup_batches = n
+        return self
+
+    def run_throughput(self) -> ThroughputResult:
+        """Run throughput via Rust optimized pipeline (no Python for loop)."""
+        if self._num_qubits is None or self._total_batches is None:
+            raise ValueError(
+                "Set qubits and batches (e.g. .qubits(16).batches(100, 64))"
+            )
+
+        run_rust = _get_run_throughput_pipeline_py()
+        duration_sec, vectors_per_sec, _ = run_rust(
+            device_id=self._device_id,
+            num_qubits=self._num_qubits,
+            batch_size=self._batch_size,
+            total_batches=self._total_batches,
+            encoding_method=self._encoding_method,
+            warmup_batches=self._warmup_batches,
+            seed=None,
+        )
+        return ThroughputResult(
+            duration_sec=duration_sec, vectors_per_sec=vectors_per_sec
+        )
+
+    def run_latency(self) -> LatencyResult:
+        """Run latency via Rust optimized pipeline (no Python for loop)."""
+        if self._num_qubits is None or self._total_batches is None:
+            raise ValueError(
+                "Set qubits and batches (e.g. .qubits(16).batches(100, 64))"
+            )
+
+        run_rust = _get_run_throughput_pipeline_py()
+        duration_sec, _, latency_ms_per_vector = run_rust(
+            device_id=self._device_id,
+            num_qubits=self._num_qubits,
+            batch_size=self._batch_size,
+            total_batches=self._total_batches,
+            encoding_method=self._encoding_method,
+            warmup_batches=self._warmup_batches,
+            seed=None,
+        )
+        return LatencyResult(
+            duration_sec=duration_sec,
+            latency_ms_per_vector=latency_ms_per_vector,
+        )
diff --git a/qdp/qdp-python/qumat_qdp/loader.py 
b/qdp/qdp-python/qumat_qdp/loader.py
new file mode 100644
index 000000000..d5c37d65a
--- /dev/null
+++ b/qdp/qdp-python/qumat_qdp/loader.py
@@ -0,0 +1,202 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Quantum Data Loader: Python builder for Rust-backed batch iterator.
+
+Usage:
+    from qumat_qdp import QuantumDataLoader
+
+    loader = (QuantumDataLoader(device_id=0).qubits(16).encoding("amplitude")
+              .batches(100, size=64).source_synthetic())
+    for qt in loader:
+        batch = torch.from_dlpack(qt)
+        ...
+"""
+
+from __future__ import annotations
+
+from functools import lru_cache
+from typing import TYPE_CHECKING, Iterator, Optional
+
+if TYPE_CHECKING:
+    import _qdp  # noqa: F401 -- for type checkers only
+
+# Rust interface expects seed as Option<u64>: non-negative and <= 2^64 - 1.
+# Ref: qdp-core PipelineConfig seed: Option<u64>
+_U64_MAX = 2**64 - 1
+
+# Lazy import _qdp at runtime until __iter__ is used; TYPE_CHECKING import 
above
+# is for type checkers only so they can resolve "_qdp.*" annotations if needed.
+
+
+@lru_cache(maxsize=1)
+def _get_qdp():
+    import _qdp as m
+
+    return m
+
+
+def _validate_loader_args(
+    *,
+    device_id: int,
+    num_qubits: int,
+    batch_size: int,
+    total_batches: int,
+    encoding_method: str,
+    seed: Optional[int],
+) -> None:
+    """Validate arguments before passing to Rust (PipelineConfig / 
create_synthetic_loader)."""
+    if device_id < 0:
+        raise ValueError(f"device_id must be non-negative, got {device_id!r}")
+    if not isinstance(num_qubits, int) or num_qubits < 1:
+        raise ValueError(f"num_qubits must be a positive integer, got 
{num_qubits!r}")
+    if not isinstance(batch_size, int) or batch_size < 1:
+        raise ValueError(f"batch_size must be a positive integer, got 
{batch_size!r}")
+    if not isinstance(total_batches, int) or total_batches < 1:
+        raise ValueError(
+            f"total_batches must be a positive integer, got {total_batches!r}"
+        )
+    if not encoding_method or not isinstance(encoding_method, str):
+        raise ValueError(
+            f"encoding_method must be a non-empty string, got 
{encoding_method!r}"
+        )
+    if seed is not None:
+        if not isinstance(seed, int):
+            raise ValueError(
+                f"seed must be None or an integer, got {type(seed).__name__!r}"
+            )
+        if seed < 0 or seed > _U64_MAX:
+            raise ValueError(
+                f"seed must be in range [0, {_U64_MAX}] (Rust u64), got 
{seed!r}"
+            )
+
+
+class QuantumDataLoader:
+    """
+    Builder for a synthetic-data quantum encoding iterator.
+
+    Yields one QuantumTensor (batch) per iteration. All encoding runs in Rust;
+    __iter__ returns the Rust-backed iterator from create_synthetic_loader.
+    """
+
+    def __init__(
+        self,
+        device_id: int = 0,
+        num_qubits: int = 16,
+        batch_size: int = 64,
+        total_batches: int = 100,
+        encoding_method: str = "amplitude",
+        seed: Optional[int] = None,
+    ) -> None:
+        _validate_loader_args(
+            device_id=device_id,
+            num_qubits=num_qubits,
+            batch_size=batch_size,
+            total_batches=total_batches,
+            encoding_method=encoding_method,
+            seed=seed,
+        )
+        self._device_id = device_id
+        self._num_qubits = num_qubits
+        self._batch_size = batch_size
+        self._total_batches = total_batches
+        self._encoding_method = encoding_method
+        self._seed = seed
+
+    def qubits(self, n: int) -> QuantumDataLoader:
+        """Set number of qubits. Returns self for chaining."""
+        if not isinstance(n, int) or n < 1:
+            raise ValueError(f"num_qubits must be a positive integer, got 
{n!r}")
+        self._num_qubits = n
+        return self
+
+    def encoding(self, method: str) -> QuantumDataLoader:
+        """Set encoding method (e.g. 'amplitude', 'angle', 'basis'). Returns 
self."""
+        if not method or not isinstance(method, str):
+            raise ValueError(
+                f"encoding_method must be a non-empty string, got {method!r}"
+            )
+        self._encoding_method = method
+        return self
+
+    def batches(self, total: int, size: int = 64) -> QuantumDataLoader:
+        """Set total number of batches and batch size. Returns self."""
+        if not isinstance(total, int) or total < 1:
+            raise ValueError(f"total_batches must be a positive integer, got 
{total!r}")
+        if not isinstance(size, int) or size < 1:
+            raise ValueError(f"batch_size must be a positive integer, got 
{size!r}")
+        self._total_batches = total
+        self._batch_size = size
+        return self
+
+    def source_synthetic(
+        self,
+        total_batches: Optional[int] = None,
+    ) -> QuantumDataLoader:
+        """Use synthetic data source (default). Optionally override 
total_batches. Returns self."""
+        if total_batches is not None:
+            if not isinstance(total_batches, int) or total_batches < 1:
+                raise ValueError(
+                    f"total_batches must be a positive integer, got 
{total_batches!r}"
+                )
+            self._total_batches = total_batches
+        return self
+
+    def seed(self, s: Optional[int] = None) -> QuantumDataLoader:
+        """Set RNG seed for reproducible synthetic data (must fit Rust u64: 0 
<= seed <= 2^64-1). Returns self."""
+        if s is not None:
+            if not isinstance(s, int):
+                raise ValueError(
+                    f"seed must be None or an integer, got 
{type(s).__name__!r}"
+                )
+            if s < 0 or s > _U64_MAX:
+                raise ValueError(
+                    f"seed must be in range [0, {_U64_MAX}] (Rust u64), got 
{s!r}"
+                )
+        self._seed = s
+        return self
+
+    def __iter__(self) -> Iterator[object]:
+        """Return Rust-backed iterator that yields one QuantumTensor per 
batch."""
+        _validate_loader_args(
+            device_id=self._device_id,
+            num_qubits=self._num_qubits,
+            batch_size=self._batch_size,
+            total_batches=self._total_batches,
+            encoding_method=self._encoding_method,
+            seed=self._seed,
+        )
+        qdp = _get_qdp()
+        QdpEngine = getattr(qdp, "QdpEngine", None)
+        if QdpEngine is None:
+            raise RuntimeError(
+                "_qdp.QdpEngine not found. Build the extension with maturin 
develop."
+            )
+        engine = QdpEngine(device_id=self._device_id)
+        create_synthetic_loader = getattr(engine, "create_synthetic_loader", 
None)
+        if create_synthetic_loader is None:
+            raise RuntimeError(
+                "create_synthetic_loader not available (e.g. only on Linux 
with CUDA)."
+            )
+        loader = create_synthetic_loader(
+            total_batches=self._total_batches,
+            batch_size=self._batch_size,
+            num_qubits=self._num_qubits,
+            encoding_method=self._encoding_method,
+            seed=self._seed,
+        )
+        return iter(loader)
diff --git a/qdp/qdp-python/src/lib.rs b/qdp/qdp-python/src/lib.rs
index 69743a862..ff1083f97 100644
--- a/qdp/qdp-python/src/lib.rs
+++ b/qdp/qdp-python/src/lib.rs
@@ -15,13 +15,21 @@
 // limitations under the License.
 
 use numpy::{PyReadonlyArray1, PyReadonlyArray2, PyUntypedArrayMethods};
-use pyo3::exceptions::PyRuntimeError;
+use pyo3::exceptions::{PyRuntimeError, PyStopIteration};
 use pyo3::ffi;
 use pyo3::prelude::*;
 use qdp_core::dlpack::{DL_FLOAT, DLDeviceType, DLManagedTensor};
 use qdp_core::{Precision, QdpEngine as CoreEngine};
 use std::ffi::c_void;
 
+#[cfg(target_os = "linux")]
+use qdp_core::{PipelineConfig, PipelineIterator, PipelineRunResult, 
run_throughput_pipeline};
+
+/// Wraps raw DLPack pointer so it can cross `py.allow_threads` (closure 
return must be `Send`).
+/// Safe: DLPack pointer handover across contexts; GIL is released only during 
the closure.
+struct SendPtr(pub *mut DLManagedTensor);
+unsafe impl Send for SendPtr {}
+
 /// Quantum tensor wrapper implementing DLPack protocol
 ///
 /// This class wraps a GPU-allocated quantum state vector and implements
@@ -112,9 +120,7 @@ impl QuantumTensor {
 
         unsafe {
             let tensor = &(*self.ptr).dl_tensor;
-            // device_type is an enum, convert to integer
-            // kDLCUDA = 2, kDLCPU = 1
-            // Ref: 
https://github.com/dmlc/dlpack/blob/6ea9b3eb64c881f614cd4537f95f0e125a35555c/include/dlpack/dlpack.h#L76-L80
+            // DLPack device_type: kDLCUDA = 2, kDLCPU = 1
             let device_type = match tensor.device.device_type {
                 qdp_core::dlpack::DLDeviceType::kDLCUDA => 2,
                 qdp_core::dlpack::DLDeviceType::kDLCPU => 1,
@@ -151,6 +157,74 @@ impl Drop for QuantumTensor {
 unsafe impl Send for QuantumTensor {}
 unsafe impl Sync for QuantumTensor {}
 
+/// Python iterator yielding one QuantumTensor (batch) per __next__. Releases 
GIL during next_batch().
+#[cfg(target_os = "linux")]
+#[pyclass]
+struct PyQuantumLoader {
+    inner: Option<PipelineIterator>,
+}
+
+#[cfg(target_os = "linux")]
+#[pymethods]
+impl PyQuantumLoader {
+    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+        slf
+    }
+
+    /// Returns the next batch as QuantumTensor; raises StopIteration when 
exhausted. Releases GIL during encode.
+    fn __next__(mut slf: PyRefMut<'_, Self>, py: Python<'_>) -> 
PyResult<QuantumTensor> {
+        let mut inner_iter = match slf.inner.take() {
+            Some(it) => it,
+            None => return Err(PyStopIteration::new_err("loader exhausted")),
+        };
+
+        #[allow(deprecated)]
+        let result = py.allow_threads(move || {
+            let res = inner_iter.next_batch();
+            match res {
+                Ok(Some(ptr)) => Ok((inner_iter, Some(SendPtr(ptr)))),
+                Ok(None) => Ok((inner_iter, None)),
+                Err(e) => Err((inner_iter, e)),
+            }
+        });
+
+        match result {
+            Ok((returned_iter, Some(send_ptr))) => {
+                slf.inner = Some(returned_iter);
+                Ok(QuantumTensor {
+                    ptr: send_ptr.0,
+                    consumed: false,
+                })
+            }
+            Ok((_, None)) => Err(PyStopIteration::new_err("loader exhausted")),
+            Err((returned_iter, e)) => {
+                slf.inner = Some(returned_iter);
+                Err(PyRuntimeError::new_err(e.to_string()))
+            }
+        }
+    }
+}
+
+/// Stub PyQuantumLoader when not on Linux (CUDA pipeline not available).
+#[cfg(not(target_os = "linux"))]
+#[pyclass]
+struct PyQuantumLoader {}
+
+#[cfg(not(target_os = "linux"))]
+#[pymethods]
+impl PyQuantumLoader {
+    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+        slf
+    }
+
+    fn __next__(&self, _py: Python<'_>) -> PyResult<QuantumTensor> {
+        Err(PyRuntimeError::new_err(
+            "QuantumDataLoader is only available on Linux (CUDA pipeline). \
+             Build and run from a Linux host with CUDA.",
+        ))
+    }
+}
+
 /// Helper to detect PyTorch tensor
 fn is_pytorch_tensor(obj: &Bound<'_, PyAny>) -> PyResult<bool> {
     let type_obj = obj.get_type();
@@ -266,12 +340,8 @@ fn get_torch_cuda_stream_ptr(tensor: &Bound<'_, PyAny>) -> 
PyResult<*mut c_void>
         )));
     }
 
+    // PyTorch default stream can report cuda_stream as 0; treat as valid 
(Rust sync is no-op for null).
     let stream_ptr: u64 = stream.getattr("cuda_stream")?.extract()?;
-    if stream_ptr == 0 {
-        return Err(PyRuntimeError::new_err(
-            "PyTorch returned a null CUDA stream pointer",
-        ));
-    }
     Ok(stream_ptr as *mut c_void)
 }
 
@@ -1052,6 +1122,180 @@ impl QdpEngine {
             consumed: false,
         })
     }
+
+    /// Create a synthetic-data loader iterator for use in Python `for qt in 
loader`.
+    ///
+    /// Yields one QuantumTensor (batch) per iteration; releases GIL during 
encode.
+    /// Use with QuantumDataLoader builder or directly for streaming encode.
+    ///
+    /// Args:
+    ///     total_batches: Number of batches to yield
+    ///     batch_size: Samples per batch
+    ///     num_qubits: Qubits per sample
+    ///     encoding_method: "amplitude", "angle", or "basis"
+    ///     seed: Optional RNG seed for reproducible synthetic data
+    ///
+    /// Returns:
+    ///     PyQuantumLoader: iterator yielding QuantumTensor per __next__
+    #[cfg(target_os = "linux")]
+    #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16, 
encoding_method="amplitude", seed=None))]
+    fn create_synthetic_loader(
+        &self,
+        total_batches: usize,
+        batch_size: usize,
+        num_qubits: u32,
+        encoding_method: &str,
+        seed: Option<u64>,
+    ) -> PyResult<PyQuantumLoader> {
+        let config = PipelineConfig {
+            device_id: self.engine.device().ordinal(),
+            num_qubits,
+            batch_size,
+            total_batches,
+            encoding_method: encoding_method.to_string(),
+            seed,
+            warmup_batches: 0,
+        };
+        let iter = PipelineIterator::new_synthetic(self.engine.clone(), config)
+            .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
+        Ok(PyQuantumLoader { inner: Some(iter) })
+    }
+
+    /// Stub when not on Linux: create_synthetic_loader is only implemented on 
Linux.
+    #[cfg(not(target_os = "linux"))]
+    #[pyo3(signature = (total_batches, batch_size=64, num_qubits=16, 
encoding_method="amplitude", seed=None))]
+    fn create_synthetic_loader(
+        &self,
+        total_batches: usize,
+        batch_size: usize,
+        num_qubits: u32,
+        encoding_method: &str,
+        seed: Option<u64>,
+    ) -> PyResult<PyQuantumLoader> {
+        let _ = (total_batches, batch_size, num_qubits, encoding_method, seed);
+        Err(PyRuntimeError::new_err(
+            "create_synthetic_loader is only available on Linux (CUDA 
pipeline). \
+             Build and run from a Linux host with CUDA.",
+        ))
+    }
+
+    /// Run dual-stream pipeline for encoding (H2D + kernel overlap). Internal 
API.
+    ///
+    /// Exposes run_dual_stream_pipeline from qdp-core. Accepts 1D host data 
(single sample).
+    /// Does not return a tensor; use for throughput measurement or when state 
is not needed.
+    /// Currently supports amplitude encoding only.
+    ///
+    /// Args:
+    ///     host_data: 1D input (list or NumPy array, float64)
+    ///     num_qubits: Number of qubits
+    ///     encoding_method: "amplitude" (other methods not yet supported for 
this path)
+    #[cfg(target_os = "linux")]
+    fn _encode_stream_internal(
+        &self,
+        host_data: &Bound<'_, PyAny>,
+        num_qubits: usize,
+        encoding_method: &str,
+    ) -> PyResult<()> {
+        let data_slice: Vec<f64> = if 
host_data.hasattr("__array_interface__")? {
+            let array_1d = 
host_data.extract::<PyReadonlyArray1<f64>>().map_err(|_| {
+                PyRuntimeError::new_err("host_data must be 1D NumPy array with 
dtype float64")
+            })?;
+            array_1d
+                .as_slice()
+                .map_err(|_| PyRuntimeError::new_err("NumPy array must be 
contiguous (C-order)"))?
+                .to_vec()
+        } else {
+            host_data.extract::<Vec<f64>>().map_err(|_| {
+                PyRuntimeError::new_err("host_data must be 1D list/array of 
float64")
+            })?
+        };
+        self.engine
+            .run_dual_stream_encode(&data_slice, num_qubits, encoding_method)
+            .map_err(|e| 
PyRuntimeError::new_err(format!("run_dual_stream_encode failed: {}", e)))
+    }
+}
+
+/// Runs the full throughput pipeline in Rust with GIL released. Returns 
(duration_sec, vectors_per_sec, latency_ms_per_vector).
+#[cfg(target_os = "linux")]
+#[pyfunction]
+#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
+#[allow(clippy::too_many_arguments)]
+fn run_throughput_pipeline_py_impl(
+    py: Python<'_>,
+    device_id: usize,
+    num_qubits: u32,
+    batch_size: usize,
+    total_batches: usize,
+    encoding_method: &str,
+    warmup_batches: usize,
+    seed: Option<u64>,
+) -> PyResult<(f64, f64, f64)> {
+    let encoding_method = encoding_method.to_string();
+    #[allow(deprecated)]
+    let result: Result<PipelineRunResult, qdp_core::MahoutError> = 
py.allow_threads(move || {
+        let config = PipelineConfig {
+            device_id,
+            num_qubits,
+            batch_size,
+            total_batches,
+            encoding_method,
+            seed,
+            warmup_batches,
+        };
+        run_throughput_pipeline(&config)
+    });
+    let res = result.map_err(|e: qdp_core::MahoutError| 
PyRuntimeError::new_err(e.to_string()))?;
+    Ok((
+        res.duration_sec,
+        res.vectors_per_sec,
+        res.latency_ms_per_vector,
+    ))
+}
+
+/// Stub when not on Linux: run_throughput_pipeline_py is only implemented on 
Linux (CUDA pipeline).
+#[cfg(not(target_os = "linux"))]
+#[pyfunction]
+#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
+fn run_throughput_pipeline_py_impl(
+    _py: Python<'_>,
+    _device_id: usize,
+    _num_qubits: u32,
+    _batch_size: usize,
+    _total_batches: usize,
+    _encoding_method: &str,
+    _warmup_batches: usize,
+    _seed: Option<u64>,
+) -> PyResult<(f64, f64, f64)> {
+    Err(PyRuntimeError::new_err(
+        "run_throughput_pipeline_py is only available on Linux (CUDA 
pipeline). \
+         Build and run from a Linux host with CUDA.",
+    ))
+}
+
+/// Public wrapper so the same name is always present in the module (import 
never fails).
+#[pyfunction]
+#[pyo3(signature = (device_id=0, num_qubits=16, batch_size=64, 
total_batches=100, encoding_method="amplitude", warmup_batches=0, seed=None))]
+#[allow(clippy::too_many_arguments)]
+fn run_throughput_pipeline_py(
+    py: Python<'_>,
+    device_id: usize,
+    num_qubits: u32,
+    batch_size: usize,
+    total_batches: usize,
+    encoding_method: &str,
+    warmup_batches: usize,
+    seed: Option<u64>,
+) -> PyResult<(f64, f64, f64)> {
+    run_throughput_pipeline_py_impl(
+        py,
+        device_id,
+        num_qubits,
+        batch_size,
+        total_batches,
+        encoding_method,
+        warmup_batches,
+        seed,
+    )
 }
 
 /// Quantum Data Plane (QDP) Python module
@@ -1059,13 +1303,12 @@ impl QdpEngine {
 /// GPU-accelerated quantum data encoding with DLPack integration.
 #[pymodule]
 fn _qdp(m: &Bound<'_, PyModule>) -> PyResult<()> {
-    // Initialize Rust logging system - respect RUST_LOG environment variable
-    // Ref: https://docs.rs/env_logger/latest/env_logger/
-    // try_init() won't fail if logger is already initialized (e.g., by 
another library)
-    // This allows Rust log messages to be visible when RUST_LOG is set
+    // Respect RUST_LOG; try_init() is idempotent if already initialized
     let _ = env_logger::Builder::from_default_env().try_init();
 
     m.add_class::<QdpEngine>()?;
     m.add_class::<QuantumTensor>()?;
+    m.add_class::<PyQuantumLoader>()?;
+    m.add_function(pyo3::wrap_pyfunction!(run_throughput_pipeline_py, m)?)?;
     Ok(())
 }
diff --git a/testing/qdp/test_benchmark_api.py 
b/testing/qdp/test_benchmark_api.py
new file mode 100644
index 000000000..d75f3a499
--- /dev/null
+++ b/testing/qdp/test_benchmark_api.py
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the benchmark API (QdpBenchmark, Rust pipeline only)."""
+
+from pathlib import Path
+
+import pytest
+
+# Allow importing benchmark API from qdp-python/benchmark and qumat_qdp from 
qdp-python
+_sys = __import__("sys")
+_qdp_python = Path(__file__).resolve().parent.parent.parent / "qdp" / 
"qdp-python"
+_bench_dir = _qdp_python / "benchmark"
+if _qdp_python.exists() and str(_qdp_python) not in _sys.path:
+    _sys.path.insert(0, str(_qdp_python))
+if _bench_dir.exists() and str(_bench_dir) not in _sys.path:
+    _sys.path.insert(0, str(_bench_dir))
+
+from .qdp_test_utils import requires_qdp  # noqa: E402
+
+
+@requires_qdp
+def test_benchmark_api_import():
+    """Test that the benchmark API exports only Rust-pipeline path (no 
encode_stream / Python loop)."""
+    import api
+
+    assert hasattr(api, "QdpBenchmark")
+    assert hasattr(api, "ThroughputResult")
+    assert hasattr(api, "LatencyResult")
+    # No naive Python for-loop API
+    assert not hasattr(api, "encode_stream")
+    assert not hasattr(api, "create_pipeline")
+    assert not hasattr(api, "StreamPipeline")
+    assert not hasattr(api, "PipelineConfig")
+
+
+@requires_qdp
[email protected]
+def test_qdp_benchmark_run_throughput():
+    """QdpBenchmark.run_throughput() calls Rust pipeline and returns 
ThroughputResult (requires GPU)."""
+    import api
+
+    result = (
+        api.QdpBenchmark(device_id=0)
+        .qubits(2)
+        .encoding("amplitude")
+        .batches(2, size=4)
+        .prefetch(2)
+        .run_throughput()
+    )
+    assert isinstance(result, api.ThroughputResult)
+    assert result.duration_sec >= 0
+    assert result.vectors_per_sec > 0
+
+
+@requires_qdp
[email protected]
+def test_qdp_benchmark_run_latency():
+    """QdpBenchmark.run_latency() calls Rust pipeline and returns 
LatencyResult (requires GPU)."""
+    import api
+
+    result = (
+        api.QdpBenchmark(device_id=0)
+        .qubits(2)
+        .encoding("amplitude")
+        .batches(2, size=4)
+        .prefetch(2)
+        .run_latency()
+    )
+    assert isinstance(result, api.LatencyResult)
+    assert result.duration_sec >= 0
+    assert result.latency_ms_per_vector > 0
+
+
+@requires_qdp
[email protected]("method", ["run_throughput", "run_latency"])
+def test_qdp_benchmark_validation(method):
+    """QdpBenchmark.run_throughput() and run_latency() raise if qubits/batches 
not set."""
+    import api
+
+    bench = api.QdpBenchmark(device_id=0)
+    runner = getattr(bench, method)
+    with pytest.raises(ValueError, match="qubits and batches"):
+        runner()
+
+
+@requires_qdp
[email protected]
+def test_qdp_benchmark_device_id_propagated():
+    """QdpBenchmark(device_id=...) propagates device_id to Rust pipeline when 
running."""
+    import api
+
+    # When qubits/batches are set, run_throughput uses the bench's device_id 
(e.g. 0).
+    result = (
+        api.QdpBenchmark(device_id=0)
+        .qubits(2)
+        .encoding("amplitude")
+        .batches(2, size=4)
+        .run_throughput()
+    )
+    assert hasattr(result, "vectors_per_sec")
+    assert result.vectors_per_sec >= 0
diff --git a/testing/qdp/test_bindings.py b/testing/qdp/test_bindings.py
index 0e692a366..a0f043456 100644
--- a/testing/qdp/test_bindings.py
+++ b/testing/qdp/test_bindings.py
@@ -438,9 +438,9 @@ def test_encode_cuda_tensor_preserves_input(data_shape, 
is_batch):
 
 @requires_qdp
 @pytest.mark.gpu
[email protected]("encoding_method", ["basis", "angle"])
[email protected]("encoding_method", ["basis"])
 def test_encode_cuda_tensor_unsupported_encoding(encoding_method):
-    """Test error when using CUDA tensor with unsupported encoding method."""
+    """Test error when using CUDA tensor with unsupported encoding (CUDA 
supports amplitude and angle only)."""
     pytest.importorskip("torch")
     import torch
     from _qdp import QdpEngine
@@ -454,7 +454,10 @@ def 
test_encode_cuda_tensor_unsupported_encoding(encoding_method):
     # Use non-zero data to avoid normalization issues
     data = torch.tensor([1.0, 0.0, 0.0, 0.0], dtype=torch.float64, 
device="cuda:0")
 
-    with pytest.raises(RuntimeError, match="only supports 'amplitude' method"):
+    with pytest.raises(
+        RuntimeError,
+        match="only supports 'amplitude' and 'angle' methods.*Use 
tensor.cpu\\(\\)",
+    ):
         engine.encode(data, 2, encoding_method)
 
 

Reply via email to