rich7420 opened a new pull request, #1000: URL: https://github.com/apache/mahout/pull/1000
### Purpose of PR <!-- Describe what this PR does. --> Introduces a **stateful Rust iterator** (Quantum Data Loader) that yields one encoded batch (DLPack) per step, so Python can drive encoding with `for qt in loader:` instead of a closed benchmark loop. The same Rust core supports both **benchmark** (full pipeline, stats only) and **data loader** (batch-by-batch iteration). The public Python API is moved out of `benchmark/` into a package **`qumat_qdp`**; benchmark scripts import from `qumat_qdp` and remain the primary consumers. ### qdp-core (Rust) - **`pipeline_runner.rs`** (new): `PipelineConfig`, `PipelineRunResult`, `run_throughput_pipeline`, `run_latency_pipeline`; shared `vector_len`, `generate_batch`, `fill_sample`. Adds **`DataSource`** enum (Synthetic only; File reserved for Phase 2) and **`PipelineIterator`** with `new_synthetic(engine, config)` and `next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>>` reusing `encode_batch`. - **`lib.rs`**: `QdpEngine` implements **`Clone`**; re-exports `DataSource`, `PipelineIterator`, `PipelineConfig`, `PipelineRunResult`, `run_throughput_pipeline`, `run_latency_pipeline` (Linux only). - **`gpu/encodings/amplitude.rs`**: `run_amplitude_dual_stream_pipeline` and exposure for dual-stream encode path used by benchmarks. ### qdp-python (Rust bindings) - **`SendPtr`** wrapper: wraps `*mut DLManagedTensor` so the raw pointer can cross `py.allow_threads` (closure return must be `Send`). Used only to release GIL during encode. - **`PyQuantumLoader`** (`#[pyclass]`): holds `Option<PipelineIterator>`. Implements Python iterator protocol: `__iter__` returns `self`; `__next__` takes iterator out with `take()`, runs `next_batch()` inside `py.allow_threads`, then restores the iterator or returns a `QuantumTensor`. Raises `StopIteration` when exhausted. Stub on non-Linux. - **`QdpEngine::create_synthetic_loader(...)`**: builds `PipelineConfig`, calls `PipelineIterator::new_synthetic(engine.clone(), config)`, returns `PyQuantumLoader`. Stub on non-Linux. - **`run_throughput_pipeline_py`**: pipeline run is executed inside **`py.allow_threads`** so the full benchmark loop runs with GIL released (replacing previous detach/allow_threads usage as appropriate). ### qumat_qdp (Python package) - **`qumat_qdp/`** at project root: `__init__.py` (re-exports `QdpEngine`, `QuantumTensor`, `QdpBenchmark`, `ThroughputResult`, `LatencyResult`, `QuantumDataLoader`, `run_throughput_pipeline_py`), **`api.py`** (QdpBenchmark, ThroughputResult, LatencyResult; calls `_qdp.run_throughput_pipeline_py`), **`loader.py`** (QuantumDataLoader builder; `__iter__` calls `engine.create_synthetic_loader` and returns the Rust iterator). - **`pyproject.toml`**: **`python-source = "."`** so the root `qumat_qdp` package is included in the wheel. ### Benchmark scripts - **Imports**: All benchmark scripts now use `from qumat_qdp import ...` (or re-export). **`benchmark/api.py`** and **`benchmark/loader.py`** only re-export from `qumat_qdp` for backward compatibility. - **Import path**: Each script inserts the project root into `sys.path` before importing `qumat_qdp`, so `uv run python benchmark/run_pipeline_baseline.py` (and similar) works without extra `PYTHONPATH`. - **`benchmark_loader_throughput.py`** (new): runs throughput by iterating `QuantumDataLoader` (`for qt in loader`) and compares with `QdpBenchmark.run_throughput()` (full Rust pipeline). ## Behaviour - **Benchmark path**: Unchanged from caller’s perspective. `QdpBenchmark(device_id=0).qubits(16).batches(100, 64).run_throughput()` still runs the full pipeline in Rust with GIL released via `run_throughput_pipeline_py`. - **Loader path**: `QuantumDataLoader(device_id=0).qubits(16).batches(100, 64).source_synthetic()` then `for qt in loader:` yields one `QuantumTensor` (batch) per iteration; GIL is released during each `next_batch()` in Rust. ## Testing - Existing benchmark flow: `uv run python benchmark/run_pipeline_baseline.py`, `benchmark_throughput.py`, `benchmark_latency.py`. - Loader throughput: `uv run python benchmark/benchmark_loader_throughput.py` (compares loader iteration vs full pipeline). ### Related Issues or PRs <!-- Add links to related issues or PRs. --> <!-- - Closes #123 --> <!-- - Related to #123 --> Related to #969 ### Changes Made <!-- Please mark one with an "x" --> - [ ] Bug fix - [x] New feature - [x] Refactoring - [ ] Documentation - [ ] Test - [ ] CI/CD pipeline - [ ] Other ### Breaking Changes <!-- Does this PR introduce a breaking change? --> - [x] Yes - [ ] No ### Checklist <!-- Please mark each item with an "x" when complete --> <!-- If not all items are complete, please open this as a **Draft PR**. Once all requirements are met, mark as ready for review. --> - [x] Added or updated unit tests for all changes - [x] Added or updated documentation for all changes - [x] Successfully built and ran all unit tests or manual tests locally - [ ] PR title follows "MAHOUT-XXX: Brief Description" format (if related to an issue) - [x] Code follows ASF guidelines -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
