This is an automated email from the ASF dual-hosted git repository.
jiekaichang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/mahout.git
The following commit(s) were added to refs/heads/main by this push:
new 563266f5e Add a pipeline Baseline (#972)
563266f5e is described below
commit 563266f5ed99f6b1e1b7908cb50c510c6426baeb
Author: KUAN-HAO HUANG <[email protected]>
AuthorDate: Thu Jan 29 23:57:15 2026 +0800
Add a pipeline Baseline (#972)
---
qdp/qdp-python/benchmark/run_pipeline_baseline.py | 271 ++++++++++++++++++++++
1 file changed, 271 insertions(+)
diff --git a/qdp/qdp-python/benchmark/run_pipeline_baseline.py
b/qdp/qdp-python/benchmark/run_pipeline_baseline.py
new file mode 100644
index 000000000..c813c8333
--- /dev/null
+++ b/qdp/qdp-python/benchmark/run_pipeline_baseline.py
@@ -0,0 +1,271 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Baseline benchmark driver for QDP optimization.
+
+Runs throughput and latency benchmarks multiple times (default 5), computes
+median/p95, gathers system metadata, and writes CSV + markdown report to
+qdp/docs/optimization/results/.
+
+Set observability before running (recommended):
+ export QDP_ENABLE_POOL_METRICS=1
+ export QDP_ENABLE_OVERLAP_TRACKING=1
+ export RUST_LOG=info
+
+Usage:
+ cd qdp/qdp-python/benchmark
+ uv run python run_pipeline_baseline.py --qubits 16 --batch-size 64
--prefetch 16 --batches 500 --trials 20
+"""
+
+from __future__ import annotations
+
+import argparse
+import os
+import subprocess
+import sys
+from datetime import datetime
+from pathlib import Path
+
+import numpy as np
+
+# Set observability env before importing Rust-backed modules (so pipeline sees
them)
+os.environ.setdefault("QDP_ENABLE_POOL_METRICS", "1")
+os.environ.setdefault("QDP_ENABLE_OVERLAP_TRACKING", "1")
+os.environ.setdefault("RUST_LOG", "info")
+
+from benchmark_latency import run_mahout as run_mahout_latency
+from benchmark_throughput import run_mahout as run_mahout_throughput
+
+
+def _repo_root() -> Path:
+ # benchmark -> qdp-python -> qdp -> mahout (workspace root)
+ return Path(__file__).resolve().parent.parent.parent.parent
+
+
+def _results_dir() -> Path:
+ return _repo_root() / "qdp" / "docs" / "optimization" / "results"
+
+
+def get_git_commit(repo_root: Path) -> str:
+ try:
+ r = subprocess.run(
+ ["git", "rev-parse", "HEAD"],
+ capture_output=True,
+ text=True,
+ cwd=repo_root,
+ timeout=5,
+ )
+ if r.returncode == 0 and r.stdout:
+ return r.stdout.strip()[:12]
+ except Exception:
+ pass
+ return "unknown"
+
+
+def get_gpu_info() -> tuple[str, str, str]:
+ gpu = driver = cuda = "unknown"
+ try:
+ import torch
+
+ if torch.cuda.is_available():
+ gpu = torch.cuda.get_device_name(0) or "unknown"
+ cuda = getattr(torch.version, "cuda", None) or "unknown"
+ # Driver from nvidia-smi if available
+ r = subprocess.run(
+ [
+ "nvidia-smi",
+ "--query-gpu=name,driver_version",
+ "--format=csv,noheader",
+ ],
+ capture_output=True,
+ text=True,
+ timeout=5,
+ )
+ if r.returncode == 0 and r.stdout:
+ line = r.stdout.strip().split("\n")[0]
+ parts = [p.strip() for p in line.split(",")]
+ if len(parts) >= 1 and not gpu or gpu == "unknown":
+ gpu = parts[0]
+ if len(parts) >= 2:
+ driver = parts[1]
+ except Exception:
+ pass
+ return gpu, driver, cuda
+
+
+def run_throughput_trials(
+ qubits: int,
+ batches: int,
+ batch_size: int,
+ prefetch: int,
+ trials: int,
+ encoding: str,
+) -> list[float]:
+ throughputs: list[float] = []
+ for i in range(trials):
+ _duration, throughput = run_mahout_throughput(
+ qubits, batches, batch_size, prefetch, encoding
+ )
+ if throughput > 0:
+ throughputs.append(throughput)
+ return throughputs
+
+
+def run_latency_trials(
+ qubits: int,
+ batches: int,
+ batch_size: int,
+ prefetch: int,
+ trials: int,
+ encoding: str,
+) -> list[float]:
+ latencies_ms: list[float] = []
+ for i in range(trials):
+ _duration, latency_ms = run_mahout_latency(
+ qubits, batches, batch_size, prefetch, encoding
+ )
+ if latency_ms > 0:
+ latencies_ms.append(latency_ms)
+ return latencies_ms
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(
+ description="Run baseline benchmarks and write CSV + report."
+ )
+ parser.add_argument("--qubits", type=int, default=16)
+ parser.add_argument("--batch-size", type=int, default=64)
+ parser.add_argument("--prefetch", type=int, default=16)
+ parser.add_argument("--batches", type=int, default=200)
+ parser.add_argument("--trials", type=int, default=5)
+ parser.add_argument(
+ "--encoding-method",
+ type=str,
+ default="amplitude",
+ choices=["amplitude", "angle", "basis"],
+ )
+ parser.add_argument(
+ "--output-prefix",
+ type=str,
+ default="pipeline_baseline",
+ help="Prefix for output files (e.g. pipeline_baseline ->
pipeline_baseline_YYYYMMDD_rep_config).",
+ )
+ parser.add_argument(
+ "--skip-throughput",
+ action="store_true",
+ help="Skip throughput trials.",
+ )
+ parser.add_argument(
+ "--skip-latency",
+ action="store_true",
+ help="Skip latency trials.",
+ )
+ args = parser.parse_args()
+
+ repo_root = _repo_root()
+ results_dir = _results_dir()
+ results_dir.mkdir(parents=True, exist_ok=True)
+
+ date_str = datetime.utcnow().strftime("%Y%m%d")
+ config_tag = "rep_config"
+ base_name = f"{args.output_prefix}_{date_str}_{config_tag}"
+
+ commit = get_git_commit(repo_root)
+ gpu, driver, cuda = get_gpu_info()
+
+ throughputs: list[float] = []
+ latencies_ms: list[float] = []
+
+ if not args.skip_throughput:
+ print(
+ f"Running throughput: {args.trials} trials (qubits={args.qubits},
batch_size={args.batch_size}, prefetch={args.prefetch}, batches={args.batches})"
+ )
+ throughputs = run_throughput_trials(
+ args.qubits,
+ args.batches,
+ args.batch_size,
+ args.prefetch,
+ args.trials,
+ args.encoding_method,
+ )
+ if throughputs:
+ print(
+ f" Throughput: median={np.median(throughputs):.1f} vec/s,
p95={np.percentile(throughputs, 95):.1f} vec/s"
+ )
+
+ if not args.skip_latency:
+ print(f"Running latency: {args.trials} trials")
+ latencies_ms = run_latency_trials(
+ args.qubits,
+ args.batches,
+ args.batch_size,
+ args.prefetch,
+ args.trials,
+ args.encoding_method,
+ )
+ if latencies_ms:
+ print(
+ f" Latency: median={np.median(latencies_ms):.3f} ms/vec,
p95={np.percentile(latencies_ms, 95):.3f} ms/vec"
+ )
+
+ # Stats (used in markdown report)
+ throughput_median = float(np.median(throughputs)) if throughputs else 0.0
+ throughput_p95 = float(np.percentile(throughputs, 95)) if throughputs else
0.0
+ latency_p50 = float(np.median(latencies_ms)) if latencies_ms else 0.0
+ latency_p95 = float(np.percentile(latencies_ms, 95)) if latencies_ms else
0.0
+ date_iso = datetime.utcnow().strftime("%Y-%m-%d")
+
+ # Markdown report
+ md_lines = [
+ "# pipeline baseline report",
+ "",
+ f"- **Date**: {date_iso}",
+ f"- **Git commit**: {commit}",
+ f"- **GPU**: {gpu}",
+ f"- **Driver**: {driver}",
+ f"- **CUDA**: {cuda}",
+ "",
+ "## Parameters",
+ "",
+ f"- qubits: {args.qubits}",
+ f"- batch_size: {args.batch_size}",
+ f"- prefetch: {args.prefetch}",
+ f"- batches: {args.batches}",
+ f"- trials: {args.trials}",
+ f"- encoding: {args.encoding_method}",
+ "",
+ "## Results",
+ "",
+ "| Metric | Median | P95 |",
+ "|--------|--------|-----|",
+ f"| Throughput (vectors/sec) | {throughput_median:.1f} |
{throughput_p95:.1f} |",
+ f"| Latency (ms/vector) | {latency_p50:.3f} | {latency_p95:.3f} |",
+ "",
+ "---",
+ "",
+ "*Generated by run_pipeline_baseline.py*",
+ ]
+ md_path = results_dir / f"{base_name}.md"
+ md_path.write_text("\n".join(md_lines), encoding="utf-8")
+ print(f"Wrote {md_path}")
+
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())