rich7420 opened a new pull request, #1000:
URL: https://github.com/apache/mahout/pull/1000

   ### Purpose of PR
   <!-- Describe what this PR does. -->
   Introduces a **stateful Rust iterator** (Quantum Data Loader) that yields 
one encoded batch (DLPack) per step, so Python can drive encoding with `for qt 
in loader:` instead of a closed benchmark loop. The same Rust core supports 
both **benchmark** (full pipeline, stats only) and **data loader** 
(batch-by-batch iteration). The public Python API is moved out of `benchmark/` 
into a package **`qumat_qdp`**; benchmark scripts import from `qumat_qdp` and 
remain the primary consumers.
   
   
   ### qdp-core (Rust)
   
   - **`pipeline_runner.rs`** (new): `PipelineConfig`, `PipelineRunResult`, 
`run_throughput_pipeline`, `run_latency_pipeline`; shared `vector_len`, 
`generate_batch`, `fill_sample`. Adds **`DataSource`** enum (Synthetic only; 
File reserved for Phase 2) and **`PipelineIterator`** with 
`new_synthetic(engine, config)` and `next_batch(&mut self) -> 
Result<Option<*mut DLManagedTensor>>` reusing `encode_batch`.
   - **`lib.rs`**: `QdpEngine` implements **`Clone`**; re-exports `DataSource`, 
`PipelineIterator`, `PipelineConfig`, `PipelineRunResult`, 
`run_throughput_pipeline`, `run_latency_pipeline` (Linux only).
   - **`gpu/encodings/amplitude.rs`**: `run_amplitude_dual_stream_pipeline` and 
exposure for dual-stream encode path used by benchmarks.
   
   ### qdp-python (Rust bindings)
   
   - **`SendPtr`** wrapper: wraps `*mut DLManagedTensor` so the raw pointer can 
cross `py.allow_threads` (closure return must be `Send`). Used only to release 
GIL during encode.
   - **`PyQuantumLoader`** (`#[pyclass]`): holds `Option<PipelineIterator>`. 
Implements Python iterator protocol: `__iter__` returns `self`; `__next__` 
takes iterator out with `take()`, runs `next_batch()` inside 
`py.allow_threads`, then restores the iterator or returns a `QuantumTensor`. 
Raises `StopIteration` when exhausted. Stub on non-Linux.
   - **`QdpEngine::create_synthetic_loader(...)`**: builds `PipelineConfig`, 
calls `PipelineIterator::new_synthetic(engine.clone(), config)`, returns 
`PyQuantumLoader`. Stub on non-Linux.
   - **`run_throughput_pipeline_py`**: pipeline run is executed inside 
**`py.allow_threads`** so the full benchmark loop runs with GIL released 
(replacing previous detach/allow_threads usage as appropriate).
   
   ### qumat_qdp (Python package)
   
   - **`qumat_qdp/`** at project root: `__init__.py` (re-exports `QdpEngine`, 
`QuantumTensor`, `QdpBenchmark`, `ThroughputResult`, `LatencyResult`, 
`QuantumDataLoader`, `run_throughput_pipeline_py`), **`api.py`** (QdpBenchmark, 
ThroughputResult, LatencyResult; calls `_qdp.run_throughput_pipeline_py`), 
**`loader.py`** (QuantumDataLoader builder; `__iter__` calls 
`engine.create_synthetic_loader` and returns the Rust iterator).
   - **`pyproject.toml`**: **`python-source = "."`** so the root `qumat_qdp` 
package is included in the wheel.
   
   ### Benchmark scripts
   
   - **Imports**: All benchmark scripts now use `from qumat_qdp import ...` (or 
re-export). **`benchmark/api.py`** and **`benchmark/loader.py`** only re-export 
from `qumat_qdp` for backward compatibility.
   - **Import path**: Each script inserts the project root into `sys.path` 
before importing `qumat_qdp`, so `uv run python 
benchmark/run_pipeline_baseline.py` (and similar) works without extra 
`PYTHONPATH`.
   - **`benchmark_loader_throughput.py`** (new): runs throughput by iterating 
`QuantumDataLoader` (`for qt in loader`) and compares with 
`QdpBenchmark.run_throughput()` (full Rust pipeline).
   
   ## Behaviour
   
   - **Benchmark path**: Unchanged from caller’s perspective. 
`QdpBenchmark(device_id=0).qubits(16).batches(100, 64).run_throughput()` still 
runs the full pipeline in Rust with GIL released via 
`run_throughput_pipeline_py`.
   - **Loader path**: `QuantumDataLoader(device_id=0).qubits(16).batches(100, 
64).source_synthetic()` then `for qt in loader:` yields one `QuantumTensor` 
(batch) per iteration; GIL is released during each `next_batch()` in Rust.
   
   ## Testing
   
   - Existing benchmark flow: `uv run python 
benchmark/run_pipeline_baseline.py`, `benchmark_throughput.py`, 
`benchmark_latency.py`.
   - Loader throughput: `uv run python 
benchmark/benchmark_loader_throughput.py` (compares loader iteration vs full 
pipeline).
   
   ### Related Issues or PRs
   <!-- Add links to related issues or PRs. -->
   <!-- - Closes #123  -->
   <!-- - Related to #123   -->
   Related to #969 
   
   ### Changes Made
   <!-- Please mark one with an "x"   -->
   - [ ] Bug fix
   - [x] New feature
   - [x] Refactoring
   - [ ] Documentation
   - [ ] Test
   - [ ] CI/CD pipeline
   - [ ] Other
   
   ### Breaking Changes
   <!-- Does this PR introduce a breaking change? -->
   - [x] Yes
   - [ ] No
   
   ### Checklist
   <!-- Please mark each item with an "x" when complete -->
   <!-- If not all items are complete, please open this as a **Draft PR**.
   Once all requirements are met, mark as ready for review. -->
   
   - [x] Added or updated unit tests for all changes
   - [x] Added or updated documentation for all changes
   - [x] Successfully built and ran all unit tests or manual tests locally
   - [ ] PR title follows "MAHOUT-XXX: Brief Description" format (if related to 
an issue)
   - [x] Code follows ASF guidelines
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to