This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 8001e4c86e feat(bench): add Arrow Flight E2E benchmark + Benchmarks CI
workflow (#5557)
8001e4c86e is described below
commit 8001e4c86e8d60971887ad7509b88c42a9fd1ad5
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Jun 11 15:29:10 2026 -0700
feat(bench): add Arrow Flight E2E benchmark + Benchmarks CI workflow (#5557)
### What changes were proposed in this PR?
A bench-agnostic CI lifecycle that future suites (e.g. JMH for
`ArrowUtils` micros) plug into by appending one line to
`bin/run-benchmarks.sh`, plus the first concrete suite: an end-to-end
Arrow Flight + `PythonWorkflowWorker` micro-bench.
**Lifecycle**
| Trigger | Mode | PR comment | Publish to gh-pages |
|---|---|---|---|
| `pull_request` (label-gated, mirrors `amber-integration`'s set) | `pr`
— 3 configs × 20 batches (~5 min) | ✓ | — |
| `push` to `main` | `pr` (post-merge fast signal) | — | ✓ |
| `schedule` Sundays 08:00 UTC | `full` — 36 configs × 200 batches
(~50-60 min) | — | ✓ |
| `workflow_dispatch` | `full` | — | — |
PR runs upload the bench as an artifact + render a markdown summary
table on the workflow page; the `workflow_run`-triggered `Benchmarks PR
Comment` listener (separate file because `pull_request` from forks gets
a read-only token and zero secret access) downloads the artifact,
sanitizes the CSV, and upserts a single marker-tagged PR comment.
Non-blocking — not part of `required-checks.yml`'s aggregator.
**First benchmark: Arrow Flight E2E (`ArrowFlightActorBench`)**
Spawns a real `PythonWorkflowWorker` actor (real Pekko mailbox + real
`texera_run_python_worker.py` subprocess + real Arrow Flight gRPC) wired
to an identity Python UDF, then times per-batch send→echo round-trip
across a sweep of `batch_size × schema_width × string_len`. Per-config
output: throughput (tuples/s, MB/s), latency p50/p95/p99, total ms. Each
config writes incrementally so a killed sweep still leaves usable
artifacts.
ASF: `benchmark-action/github-action-benchmark` is SHA-pinned to
`52576c92bccf6ac60c8223ec7eb2565637cae9ba` (v1.22.1) per the
apache-infrastructure-actions allow-list.
### Any related issues, documentation, discussions?
Closes #5556
### How was this PR tested?
End-to-end validated on a fork-internal PR —
[Yicong-Huang/texera#17](https://github.com/Yicong-Huang/texera/pull/17)
ran the full `Benchmarks` workflow, the `workflow_run` listener fired,
and a marker-tagged comment landed and upserted across two push cycles
([rendered
example](https://github.com/Yicong-Huang/texera/pull/17#issuecomment-4645589605)).
`workflow_run` only listens on the default branch, so the loop can't be
tested from a non-default branch — that's why the dry-run lived on a
fork; after merge, the same flow takes effect on `apache/texera:main`
automatically.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.github/workflows/benchmarks-pr-comment.yml | 261 +++++++++
.github/workflows/benchmarks.yml | 327 ++++++++++++
amber/build.sbt | 7 +
.../texera/amber/bench/ArrowFlightActorBench.scala | 592 +++++++++++++++++++++
bin/run-benchmarks.sh | 59 ++
5 files changed, 1246 insertions(+)
diff --git a/.github/workflows/benchmarks-pr-comment.yml
b/.github/workflows/benchmarks-pr-comment.yml
new file mode 100644
index 0000000000..f8bbae1a65
--- /dev/null
+++ b/.github/workflows/benchmarks-pr-comment.yml
@@ -0,0 +1,261 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Posts (or upserts) a PR comment with bench results AFTER the Benchmarks
+# workflow completes.
+#
+# Why a separate workflow_run-triggered file:
+# - The Benchmarks workflow runs on `pull_request`, which for fork PRs
+# gets a read-only GITHUB_TOKEN and zero secret access — GitHub's
+# hard-coded security model. We can't comment from there.
+# - `workflow_run` runs in the BASE repo's context (apache/texera)
+# with normal token + secret access, so it CAN comment on fork PRs.
+# - This is the ASF-approved pattern; `pull_request_target` is policy-
+# forbidden for any action that handles tokens.
+#
+# Why workflow_run is safe here vs pull_request_target:
+# - We only READ a small, opaque artifact (pr-number.txt + the bench
+# JSON / CSV) produced by the upstream run; we don't execute any
+# PR-author code in this workflow.
+# - The PR number is validated against ^[0-9]+$ before being used in
+# any API call, blocking ref injection.
+
+name: Benchmarks PR Comment
+
+on:
+ workflow_run:
+ workflows: ["Benchmarks"]
+ types: [completed]
+
+permissions:
+ # Need pull-requests: write to post / update the comment.
+ # contents: read is the default and enough to checkout for github-script
+ # which we don't actually do here (we only call REST APIs).
+ pull-requests: write
+ actions: read
+
+jobs:
+ comment:
+ # Only act when the upstream Benchmarks run was triggered by a PR.
+ # push-to-main / schedule / dispatch produce no PR to comment on.
+ if: ${{ github.event.workflow_run.event == 'pull_request' }}
+ runs-on: ubuntu-22.04
+ steps:
+ - name: Download bench-results artifact
+ uses: actions/github-script@v8
+ with:
+ script: |
+ const fs = require("fs");
+ const path = require("path");
+ const runId = context.payload.workflow_run.id;
+ const { data } = await
github.rest.actions.listWorkflowRunArtifacts({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ run_id: runId,
+ });
+ const match = data.artifacts.find((a) =>
a.name.startsWith("bench-results-"));
+ if (!match) {
+ core.warning(`no bench-results-* artifact on run ${runId};
nothing to comment.`);
+ return;
+ }
+ const zip = await github.rest.actions.downloadArtifact({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ artifact_id: match.id,
+ archive_format: "zip",
+ });
+ fs.mkdirSync("bench-results-zip", { recursive: true });
+ fs.writeFileSync(path.join("bench-results-zip", "artifact.zip"),
Buffer.from(zip.data));
+ core.info(`downloaded artifact ${match.name}
(${match.size_in_bytes} bytes)`);
+
+ - name: Unzip artifact
+ run: |
+ mkdir -p bench-results
+ unzip -o bench-results-zip/artifact.zip -d bench-results
+ ls -la bench-results/
+
+ - name: Read PR number from artifact
+ id: pr
+ # Read + strictly validate (digits only) before using in API calls.
+ # The artifact comes from a fork-triggered workflow whose contents
+ # are not entirely trusted; numeric-only PR numbers block any
+ # injection vector through this value.
+ run: |
+ if [ ! -f bench-results/pr-number.txt ]; then
+ echo "no pr-number.txt in artifact; bailing out"
+ exit 0
+ fi
+ raw=$(tr -d '[:space:]' < bench-results/pr-number.txt)
+ if ! [[ "$raw" =~ ^[0-9]+$ ]]; then
+ echo "invalid pr-number.txt contents: '$raw'"
+ exit 1
+ fi
+ echo "number=$raw" >> "$GITHUB_OUTPUT"
+
+ - name: Upsert PR comment with bench summary
+ if: steps.pr.outputs.number != ''
+ uses: actions/github-script@v8
+ env:
+ PR_NUMBER: ${{ steps.pr.outputs.number }}
+ with:
+ script: |
+ const fs = require("fs");
+ const pr = Number(process.env.PR_NUMBER);
+ const marker = "<!-- texera-benchmarks-comment -->";
+
+ // CSV comes from a fork-PR-controlled artifact — sanitize before
+ // embedding in markdown:
+ // 1. Cap total size so a giant junk file can't bloat a comment.
+ // 2. Strip any triple-backtick sequence so the content cannot
+ // escape the surrounding code fence and inject arbitrary
+ // markdown (phishing links, image-rendering tricks, etc).
+ // Replacement with a zero-width char preserves byte alignment
+ // visually while neutralizing fence termination.
+ const MAX_CSV_BYTES = 32 * 1024;
+ const csvPath = "bench-results/arrow-flight-e2e.csv";
+ let csv = null;
+ if (fs.existsSync(csvPath)) {
+ let raw = fs.readFileSync(csvPath, "utf8");
+ if (raw.length > MAX_CSV_BYTES) {
+ raw = raw.slice(0, MAX_CSV_BYTES) + "\n[truncated]";
+ }
+ csv = raw.replace(/```+/g, "```").trim();
+ }
+
+ // Per-cell sanitizer for the markdown table: strip newlines,
escape
+ // pipes (would break columns), and cap length.
+ const escapeCell = (s) =>
+ s == null
+ ? ""
+ : String(s).replace(/[\r\n]+/g, " ").replace(/\|/g,
"\\|").slice(0, 64);
+
+ // Render selected columns as a markdown table. Drops noise columns
+ // (config_idx, total_tuples, total_bytes, lat_p95_us) and converts
+ // microseconds to milliseconds for latency readability. Returns
+ // null on any parsing failure → fallback renders raw CSV instead.
+ const csvToTable = (text) => {
+ try {
+ const rows = text
+ .trim()
+ .split(/\r?\n/)
+ .map((line) => line.split(","));
+ if (rows.length < 2) return null;
+ const header = rows[0].map((h) => h.trim());
+ const idx = (col) => header.indexOf(col);
+ const cols = [
+ { col: "batch_size", label: "batch", fmt: (v) => v },
+ { col: "schema_width", label: "schema_w", fmt: (v) => v },
+ { col: "string_len", label: "str_len", fmt: (v) => v },
+ { col: "num_batches", label: "n_batches", fmt: (v) => v },
+ { col: "tuples_per_sec", label: "tuples/s", fmt: (v) => v },
+ { col: "mb_per_sec", label: "MB/s", fmt: (v) => v },
+ {
+ col: "lat_p50_us",
+ label: "p50 ms",
+ fmt: (v) => (parseFloat(v) / 1000).toFixed(2),
+ },
+ {
+ col: "lat_p99_us",
+ label: "p99 ms",
+ fmt: (v) => (parseFloat(v) / 1000).toFixed(2),
+ },
+ { col: "total_ms", label: "total ms", fmt: (v) => v },
+ ].filter((c) => idx(c.col) >= 0);
+ if (cols.length === 0) return null;
+ const lines = [];
+ lines.push("| " + cols.map((c) => escapeCell(c.label)).join("
| ") + " |");
+ lines.push("|" + cols.map(() => "---:").join("|") + "|");
+ for (const row of rows.slice(1)) {
+ const cells = cols.map((c) => {
+ const raw = row[idx(c.col)];
+ try {
+ return escapeCell(c.fmt(raw));
+ } catch (e) {
+ return escapeCell(raw);
+ }
+ });
+ lines.push("| " + cells.join(" | ") + " |");
+ }
+ return lines.join("\n");
+ } catch (e) {
+ core.warning(`csvToTable failed: ${e.message}`);
+ return null;
+ }
+ };
+
+ // workflow_run.html_url is GitHub-emitted (URL to apache/texera
+ // run page); not attacker-influenceable.
+ const upstreamUrl = context.payload.workflow_run.html_url;
+
+ // Primary view: rendered markdown table for skim-readability.
+ // Fallback view (collapsed <details>): raw sanitized CSV for full
+ // verifiability — readers click to expand if they need every
column.
+ const tableMd = csv ? csvToTable(csv) : null;
+ const bodyParts = [marker, "## Arrow Flight E2E bench", ""];
+ if (tableMd) {
+ bodyParts.push(tableMd, "");
+ } else if (!csv) {
+ bodyParts.push("_(no arrow-flight-e2e.csv in artifact)_", "");
+ } else {
+ bodyParts.push("_(unable to parse CSV; raw below)_", "");
+ }
+ if (csv) {
+ bodyParts.push(
+ "<details><summary>Raw CSV</summary>",
+ "",
+ "```csv",
+ csv,
+ "```",
+ "",
+ "</details>",
+ ""
+ );
+ }
+ bodyParts.push(`[Full workflow run](${upstreamUrl})`);
+ const body = bodyParts.join("\n");
+
+ // Find existing marker comment so subsequent runs upsert in place.
+ // Paginate via `paginate` so a long-running PR with >100 comments
+ // still locates the marker — otherwise we'd silently create a
+ // duplicate every push past the 100-comment ceiling.
+ const allComments = await github.paginate(
+ github.rest.issues.listComments,
+ {
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: pr,
+ per_page: 100,
+ }
+ );
+ const existing = allComments.find((c) => c.body &&
c.body.includes(marker));
+ if (existing) {
+ await github.rest.issues.updateComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ comment_id: existing.id,
+ body,
+ });
+ core.info(`updated comment ${existing.id} on PR #${pr}`);
+ } else {
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: pr,
+ body,
+ });
+ core.info(`created new comment on PR #${pr}`);
+ }
diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml
new file mode 100644
index 0000000000..83da742857
--- /dev/null
+++ b/.github/workflows/benchmarks.yml
@@ -0,0 +1,327 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Texera benchmarks — bench-agnostic umbrella workflow.
+#
+# This file is the single CI entry for ALL Texera performance benchmarks
+# (currently Arrow Flight E2E; JMH and others land here as well). The
+# workflow knows nothing about specific benches — bin/run-benchmarks.sh
+# is the opaque entry point that owns which benches run and where their
+# outputs land under bench-results/. Adding a new bench is:
+# 1. Append the run command to bin/run-benchmarks.sh.
+# 2. Add a `Publish <chart-name>` step block below pointing at the
+# bench's JSON output file with the right `tool:` setting.
+# This workflow file otherwise stays unchanged.
+#
+# Triggering — mirrors amber-integration's label gate (NOT file paths):
+# - PR: runs only when one of the labels mapped to the amber-integration
+# stack in required-checks.yml's LABEL_STACKS is present on the PR.
+# Labels are applied by the .github/labeler.yml workflow on opened /
+# synchronize, so we wait for that workflow to complete before
+# deciding (same pattern required-checks.yml uses).
+# - push to main: always runs (same trimmed grid as PR for quick post-
+# merge signal) and publishes to gh-pages.
+# - schedule (weekly): runs the full 36-config sweep and publishes to
+# gh-pages — this is the authoritative long-term baseline.
+# - workflow_dispatch: manual full-grid run (no publish; bring-your-own
+# trigger for ad-hoc exploration).
+#
+# Two modes via BENCH_MODE env (read by the bench Scala main):
+# pr — 3 configs × 20 batches, ~5 min (PR + push-to-main)
+# full — 36 configs × 200 batches, ~50-60 min (schedule + dispatch)
+#
+# Non-blocking: this workflow is NOT included in required-checks.yml's
+# `required-checks` aggregator, so its result doesn't gate merges even
+# when it fails. Adding it to branch protection later is a deliberate
+# .asf.yaml change.
+#
+# Permissions:
+# contents: write — needed by benchmark-action's auto-push to gh-pages.
+# PR runs (which GitHub auto-downgrades to read-only on forks) gate
+# auto-push off via the event check, so the missing write is never
+# exercised.
+
+name: Benchmarks
+
+on:
+ push:
+ branches: [main]
+ pull_request:
+ types: [opened, reopened, synchronize, labeled, unlabeled]
+ schedule:
+ # Weekly full-grid baseline refresh, Sundays 08:00 UTC. PR and post-
+ # merge runs use a trimmed 3-config grid to stay around 5 min; the
+ # scheduled run covers the full 36-config sweep that the gh-pages
+ # dashboard tracks long-term.
+ - cron: "0 8 * * 0"
+ workflow_dispatch:
+
+permissions:
+ contents: write
+
+concurrency:
+ group: benchmarks-${{ github.ref }}
+ # On main: never cancel an in-flight baseline run; on PRs: supersede.
+ cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
+
+jobs:
+ precheck:
+ # Decide whether to run based on PR labels (push / dispatch always
+ # run). Lifted from required-checks.yml's precheck so the trigger
+ # surface matches amber-integration exactly.
+ name: Precheck
+ runs-on: ubuntu-latest
+ outputs:
+ run_bench: ${{ steps.decide.outputs.run_bench }}
+ steps:
+ - name: Wait for Pull Request Labeler
+ if: github.event_name == 'pull_request'
+ uses: actions/github-script@v8
+ with:
+ script: |
+ const ref = context.payload.pull_request.head.sha;
+ const maxAttempts = 30;
+ for (let i = 0; i < maxAttempts; i++) {
+ const { data } = await github.rest.checks.listForRef({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ ref,
+ check_name: "labeler",
+ });
+ const check = data.check_runs[0];
+ if (check && check.status === "completed") {
+ core.info(`labeler ${check.conclusion}`);
+ return;
+ }
+ core.info(`labeler not ready (attempt ${i + 1}/${maxAttempts})`);
+ await new Promise((r) => setTimeout(r, 10000));
+ }
+ core.warning("labeler did not complete within 5 minutes;
proceeding with current labels.");
+
+ - name: Decide whether to run bench
+ id: decide
+ uses: actions/github-script@v8
+ with:
+ script: |
+ const eventName = context.eventName;
+ if (eventName !== "pull_request") {
+ // push to main / workflow_dispatch always run.
+ core.info(`event=${eventName} — running unconditionally`);
+ core.setOutput("run_bench", "true");
+ return;
+ }
+ // Re-fetch labels: the labeler may have just added some.
+ const { data: pr } = await github.rest.pulls.get({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ pull_number: context.payload.pull_request.number,
+ });
+ const labels = pr.labels.map((l) => l.name);
+ core.info(`PR labels: ${labels.join(", ") || "(none)"}`);
+ // Mirrors LABEL_STACKS in required-checks.yml: every label
+ // whose stack list contains "amber-integration" triggers this
+ // bench. Keep in sync if LABEL_STACKS there changes.
+ const TRIGGER_LABELS = new Set([
+ "pyamber",
+ "engine",
+ "amber-integration",
+ "common",
+ "ddl-change",
+ "ci",
+ ]);
+ const matched = labels.filter((l) => TRIGGER_LABELS.has(l));
+ const shouldRun = matched.length > 0;
+ core.info(
+ shouldRun
+ ? `Triggering on labels: ${matched.join(", ")}`
+ : "No trigger label present; skipping bench."
+ );
+ core.setOutput("run_bench", shouldRun ? "true" : "false");
+
+ bench:
+ name: Bench
+ needs: precheck
+ if: ${{ needs.precheck.outputs.run_bench == 'true' }}
+ runs-on: ubuntu-22.04
+ env:
+ JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M
-Dfile.encoding=UTF-8
+ JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M
-Dfile.encoding=UTF-8
+ # `pr` mode = 3-config trimmed sweep (~5 min) for PR + post-merge.
+ # `full` mode = 36-config sweep (~50-60 min) for schedule + manual.
+ # Read by the bench Scala main (see GridSpec switch); workflow only
+ # decides which mode to pass.
+ BENCH_MODE: ${{ (github.event_name == 'schedule' || github.event_name ==
'workflow_dispatch') && 'full' || 'pr' }}
+ services:
+ # The bench itself doesn't touch the DB, but sbt's transitive compile
+ # chain reaches `common/auth` which imports JOOQ-generated classes
+ # from `org.apache.texera.dao.jooq.generated.*`. JOOQ codegen at
+ # sbt compile time requires a live Postgres to introspect against;
+ # without it the auth module's `User` / `UserRoleEnum` symbols fail
+ # to resolve and the whole bench compile aborts. Mirrors the same
+ # service block from amber-integration in build.yml.
+ postgres:
+ image: postgres
+ env:
+ POSTGRES_PASSWORD: postgres
+ ports:
+ - 5432:5432
+ options: >-
+ --health-cmd="pg_isready -U postgres"
+ --health-interval=10s
+ --health-timeout=5s
+ --health-retries=5
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v5
+ with:
+ fetch-depth: 0
+ - name: Setup JDK
+ uses: actions/setup-java@v5
+ with:
+ distribution: "temurin"
+ java-version: 17
+ - name: Setup Python
+ uses: actions/setup-python@v6
+ with:
+ python-version: "3.12"
+ - name: Install Python dependencies
+ # Mirrors amber-integration's installer in build.yml so the bench
+ # subprocess imports resolve identically (pytorch CPU index +
+ # betterproto plugin via dev-requirements).
+ run: |
+ python -m pip install uv
+ if [ -f amber/requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
+ if [ -f amber/operator-requirements.txt ]; then uv pip install
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt;
fi
+ if [ -f amber/dev-requirements.txt ]; then uv pip install --system
--index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi
+ - name: Install protoc
+ run: |
+ PROTOC_VERSION=$(cat bin/protoc-version.txt)
+ curl -fsSL -o /tmp/protoc.zip
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip"
+ sudo unzip -o /tmp/protoc.zip -d /usr/local
+ sudo chmod +x /usr/local/bin/protoc
+ sudo chmod -R a+rX /usr/local/include/google
+ - name: Create Database for JOOQ codegen
+ # Minimal subset of amber-integration's "Create Databases" step —
+ # JOOQ only introspects against texera_db, not iceberg/lakefs/
+ # lakekeeper schemas which the bench never touches.
+ run: psql -h localhost -U postgres -f sql/texera_ddl.sql
+ env:
+ PGPASSWORD: postgres
+ - name: Generate Python proto bindings
+ run: bash bin/python-proto-gen.sh
+ - name: Setup sbt launcher
+ uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+ - uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 #
v8.1.0
+ with:
+ extraSbtFiles: '["*.sbt", "project/**.{scala,sbt}",
"project/build.properties" ]'
+
+ - name: Run benchmarks
+ # Single opaque entry point — this workflow doesn't know which
+ # benches exist. Adding a JMH suite later = appending one line
+ # inside bin/run-benchmarks.sh and adding a publish step below.
+ run: bash bin/run-benchmarks.sh
+
+ - name: Stash PR number for downstream comment workflow
+ # PR fork workflows can't comment (GitHub forces read-only token);
+ # benchmarks-pr-comment.yml runs separately via workflow_run with
+ # proper write access, and needs the PR number to find the target.
+ # github.event.workflow_run.pull_requests is empty for fork PRs,
+ # so we ferry the number via artifact.
+ if: ${{ github.event_name == 'pull_request' && !cancelled() }}
+ env:
+ PR_NUMBER: ${{ github.event.pull_request.number }}
+ run: echo "$PR_NUMBER" > bench-results/pr-number.txt
+
+ - name: Render bench summary
+ # Render the bench CSV into a markdown table on the workflow run
+ # page. Visible without further clicks — and doesn't need any
+ # extra permissions (writes to $GITHUB_STEP_SUMMARY only).
+ if: ${{ !cancelled() }}
+ run: |
+ {
+ echo "## Bench results (\`$BENCH_MODE\` mode)"
+ echo
+ if [ -f bench-results/arrow-flight-e2e.csv ]; then
+ echo '```csv'
+ cat bench-results/arrow-flight-e2e.csv
+ echo '```'
+ else
+ echo "_(no bench-results/arrow-flight-e2e.csv produced)_"
+ fi
+ } >> "$GITHUB_STEP_SUMMARY"
+
+ - name: Upload bench artifacts
+ if: ${{ !cancelled() }}
+ uses: actions/upload-artifact@v4
+ with:
+ name: bench-results-${{ github.run_id }}
+ path: bench-results/
+ retention-days: 14
+
+ # Publish to the gh-pages dashboard. auto-push + save-data-file are
+ # both gated on push-to-main / schedule: PR runs only emit the job
+ # summary and the uploaded artifact, never touching the tracked
+ # baseline. Adding a new benchmark = adding one publish block below
+ # matching the JSON filename convention in bin/run-benchmarks.sh.
+ #
+ # `skip-fetch-gh-pages: true` because gh-pages does not exist on
+ # apache/texera yet — without this the action fatals with
+ # `couldn't find remote ref gh-pages` even before the auto-push
+ # decision. auto-push: true on push/schedule still creates the
+ # branch on first write. Once the dashboard is seeded, flip this
+ # to false to re-enable baseline comparison + alert-threshold.
+ #
+ # `continue-on-error: true` keeps any other gh-pages-side surprise
+ # (permission glitches, transient git failures) from failing the
+ # bench job overall — the bench data itself is already in the
+ # uploaded artifact above.
+ - name: Publish throughput
+ if: ${{ !cancelled() }}
+ continue-on-error: true
+ uses:
benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba
# v1.22.1
+ with:
+ name: Arrow Flight E2E Throughput
+ tool: customBiggerIsBetter
+ output-file-path: bench-results/arrow-flight-e2e-throughput.json
+ github-token: ${{ secrets.GITHUB_TOKEN }}
+ auto-push: ${{ (github.event_name == 'push' && github.ref ==
'refs/heads/main') || github.event_name == 'schedule' }}
+ save-data-file: ${{ (github.event_name == 'push' && github.ref ==
'refs/heads/main') || github.event_name == 'schedule' }}
+ skip-fetch-gh-pages: true
+ gh-pages-branch: gh-pages
+ benchmark-data-dir-path: dev/bench
+ alert-threshold: "150%"
+ # comment-on-alert needs pull-requests:write; skip and let
+ # results show up via summary-always instead.
+ comment-on-alert: false
+ summary-always: true
+ - name: Publish latency
+ if: ${{ !cancelled() }}
+ continue-on-error: true
+ uses:
benchmark-action/github-action-benchmark@52576c92bccf6ac60c8223ec7eb2565637cae9ba
# v1.22.1
+ with:
+ name: Arrow Flight E2E Latency
+ tool: customSmallerIsBetter
+ output-file-path: bench-results/arrow-flight-e2e-latency.json
+ github-token: ${{ secrets.GITHUB_TOKEN }}
+ auto-push: ${{ (github.event_name == 'push' && github.ref ==
'refs/heads/main') || github.event_name == 'schedule' }}
+ save-data-file: ${{ (github.event_name == 'push' && github.ref ==
'refs/heads/main') || github.event_name == 'schedule' }}
+ skip-fetch-gh-pages: true
+ gh-pages-branch: gh-pages
+ benchmark-data-dir-path: dev/bench
+ alert-threshold: "150%"
+ comment-on-alert: false
+ summary-always: true
diff --git a/amber/build.sbt b/amber/build.sbt
index aa775f7301..efcaee5006 100644
--- a/amber/build.sbt
+++ b/amber/build.sbt
@@ -62,6 +62,13 @@ Compile / unmanagedSourceDirectories += baseDirectory.value
/ "src" / "main" / "
// AMBER_TEST_FILTER env var below routes which tagged subset runs.
Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" /
"integration"
+// `amber/src/bench` holds performance benchmarks (no pass/fail assertion;
+// emit metrics for the github-action-benchmark CI dashboard). Kept out of
+// `src/test/` so reviewers don't conflate "runs in test suite" with "is a
+// test". Same Test-scope wiring as `integration/` above so scalafmt /
+// scalafix still cover it and `sbt Test/runMain` can invoke benches.
+Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "bench" /
"scala"
+
// Test-filter switch driven by the AMBER_TEST_FILTER env var so the
// amber and amber-integration CI jobs select disjoint subsets without
// each invocation having to embed a `set Tests.Argument(...)` prefix.
diff --git
a/amber/src/bench/scala/org/apache/texera/amber/bench/ArrowFlightActorBench.scala
b/amber/src/bench/scala/org/apache/texera/amber/bench/ArrowFlightActorBench.scala
new file mode 100644
index 0000000000..79d0c8cd7d
--- /dev/null
+++
b/amber/src/bench/scala/org/apache/texera/amber/bench/ArrowFlightActorBench.scala
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.bench
+
+import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
+import org.apache.pekko.testkit.TestProbe
+import org.apache.texera.amber.clustering.SingleNodeListener
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ EmbeddedControlMessageIdentity,
+ WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
+import
org.apache.texera.amber.engine.architecture.common.WorkflowActor.{NetworkAck,
NetworkMessage}
+import
org.apache.texera.amber.engine.architecture.pythonworker.PythonWorkflowWorker
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.ReturnInvocation
+import
org.apache.texera.amber.engine.architecture.scheduling.config.WorkerConfig
+import
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.OneToOnePartitioning
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame,
WorkflowFIFOMessage}
+import
org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize
+import org.apache.texera.amber.util.VirtualIdentityUtils
+
+import java.io.PrintWriter
+import java.nio.file.{Files, Paths}
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+/**
+ * End-to-end micro-benchmark of the real Arrow Flight data path through a
+ * live PythonWorkflowWorker actor.
+ *
+ * Each measured config spawns a fresh PythonWorkflowWorker (real Pekko actor,
+ * real Python subprocess via texera_run_python_worker.py, real Arrow Flight
+ * transport), wires up an identity Python UDF, and times the round-trip of
+ * `numBatches` DataFrames send→echo through the actor mailbox.
+ *
+ * Output (rewritten incrementally after every config so a killed sweep
+ * still preserves usable data):
+ * - stdout summary per config
+ * - bench-results/arrow-flight-e2e.csv (one row per config)
+ * - bench-results/arrow-flight-e2e-throughput.json
(github-action-benchmark customBiggerIsBetter)
+ * - bench-results/arrow-flight-e2e-latency.json
(github-action-benchmark customSmallerIsBetter)
+ *
+ * Run with:
+ * sbt "WorkflowExecutionService/Test/runMain \
+ * org.apache.texera.amber.bench.ArrowFlightActorBench"
+ *
+ * Caveat: `Utils.amberHomePath` does a `Files.walk(cwd, 2).findAny` for any
+ * dir ending in `amber`. If `.claude/amber/` exists locally, the Python
+ * subprocess may end up trying to launch from that path; move it aside for
+ * the run, or fix `amberHomePath` upstream.
+ */
+object ArrowFlightActorBench {
+
+ //
---------------------------------------------------------------------------
+ // Identity Python UDF — passes input tuples straight through to output.
+ //
---------------------------------------------------------------------------
+ private val IdentityPythonCode: String =
+ """from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | @overrides
+ | def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ | yield tuple_
+ |""".stripMargin
+
+ private val WorkflowId = WorkflowIdentity(1L)
+ private val InputPortId = PortIdentity(id = 0, internal = false)
+ private val OutputPortId = PortIdentity(id = 0, internal = false)
+
+ // Sweep grid + iteration counts switch on BENCH_MODE so PR / post-merge
+ // checks stay around 5 min while scheduled / manual runs do the full
+ // 36-config grid that the gh-pages dashboard tracks long-term.
+ // pr — 3 configs × 20 batches, warmup 5 (~4-5 min in CI)
+ // full — 36 configs × 200 batches, warmup 20 (~50-60 min in CI)
+ // BENCH_NUM_BATCHES, if set, overrides numBatches for the current mode
+ // (useful for local smoke).
+ private val BenchMode: String = sys.env.getOrElse("BENCH_MODE",
"full").toLowerCase
+
+ private case class GridSpec(
+ batchSizes: Seq[Int],
+ schemaWidths: Seq[Int],
+ stringLens: Seq[Int],
+ numBatches: Int,
+ warmupBatches: Int
+ )
+
+ private val grid: GridSpec = BenchMode match {
+ case "pr" =>
+ GridSpec(
+ batchSizes = Seq(10, 100, 1000),
+ schemaWidths = Seq(10),
+ stringLens = Seq(64),
+ numBatches =
sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(20),
+ warmupBatches = 5
+ )
+ case _ =>
+ GridSpec(
+ batchSizes = Seq(10, 100, 1000, 10000),
+ schemaWidths = Seq(1, 10, 50),
+ stringLens = Seq(8, 64, 512),
+ numBatches =
sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(200),
+ warmupBatches = 20
+ )
+ }
+
+ private val DefaultBatchSizes: Seq[Int] = grid.batchSizes
+ private val DefaultSchemaWidths: Seq[Int] = grid.schemaWidths
+ private val DefaultStringLens: Seq[Int] = grid.stringLens
+ private val DefaultNumBatches: Int = grid.numBatches
+ private val WarmupBatches: Int = grid.warmupBatches
+
+ // All artifacts land under bench-results/ so CI can artifact-upload the
+ // whole directory uniformly without knowing individual filenames beyond
+ // what its publish matrix declares.
+ // Conventions for new benches:
+ // bench-results/<bench-name>-throughput.json → customBiggerIsBetter
+ // bench-results/<bench-name>-latency.json → customSmallerIsBetter
+ // bench-results/<bench-name>-jmh.json → tool=jmh
+ private val OutDir = Paths.get("bench-results")
+ private val CsvOutPath = OutDir.resolve("arrow-flight-e2e.csv")
+ // Two JSON files — github-action-benchmark needs distinct
+ // customBiggerIsBetter / customSmallerIsBetter inputs since each upload
+ // direction is per-`tool` parameter.
+ private val ThroughputJsonPath =
OutDir.resolve("arrow-flight-e2e-throughput.json")
+ private val LatencyJsonPath = OutDir.resolve("arrow-flight-e2e-latency.json")
+
+ //
---------------------------------------------------------------------------
+ // Sink actor: stands in for the downstream worker. Auto-acks every
+ // NetworkMessage from the worker (otherwise PekkoMessageTransferService
+ // throttles after the first unacked reply and the bench stalls), and
+ // forwards every received message to the bench probe for inspection.
+ //
---------------------------------------------------------------------------
+ private class AutoAckSink(forwardTo: ActorRef) extends Actor {
+ override def receive: Receive = {
+ case msg @ NetworkMessage(id, internal) =>
+ sender() ! NetworkAck(id, getInMemSize(internal), 0L)
+ forwardTo ! msg
+ case other =>
+ forwardTo ! other
+ }
+ }
+
+ private case class BenchConfig(
+ configIdx: Int,
+ batchSize: Int,
+ schemaWidth: Int,
+ stringLen: Int,
+ numBatches: Int
+ )
+
+ private case class BenchResult(
+ cfg: BenchConfig,
+ totalWallNs: Long,
+ totalTuples: Long,
+ totalBytesApprox: Long,
+ latencyP50Ns: Long,
+ latencyP95Ns: Long,
+ latencyP99Ns: Long
+ ) {
+ def tuplesPerSec: Double = totalTuples * 1e9 / totalWallNs
+ def mbPerSec: Double = totalBytesApprox * 1e9 / totalWallNs / (1024.0 *
1024.0)
+ }
+
+ def main(args: Array[String]): Unit = {
+ val system = ActorSystem("arrow-flight-bench", AmberRuntime.pekkoConfig)
+ system.actorOf(Props[SingleNodeListener](), "cluster-info")
+
+ val configs: Seq[BenchConfig] = (for {
+ sw <- DefaultSchemaWidths
+ sl <- DefaultStringLens
+ bs <- DefaultBatchSizes
+ } yield (sw, sl, bs)).zipWithIndex.map {
+ case ((sw, sl, bs), idx) =>
+ BenchConfig(
+ idx,
+ batchSize = bs,
+ schemaWidth = sw,
+ stringLen = sl,
+ numBatches = DefaultNumBatches
+ )
+ }
+
+ println(s"[bench] sweeping ${configs.size} configurations ×
${DefaultNumBatches} batches each")
+ // Pre-create output dir + rewrite the result files after every completed
+ // config so a killed / timed-out sweep still leaves a usable artifact.
+ Files.createDirectories(OutDir)
+ val resultsBuf = scala.collection.mutable.ArrayBuffer.empty[BenchResult]
+ configs.foreach { cfg =>
+ try {
+ val r = runConfig(system, cfg)
+ resultsBuf += r
+ writeCsv(resultsBuf.toSeq)
+ writeJsonForGitHubActionBenchmark(resultsBuf.toSeq)
+ } catch {
+ case t: Throwable =>
+ println(s"[bench] FAILED config #${cfg.configIdx} ($cfg): $t")
+ }
+ }
+ printSummary(resultsBuf.toSeq)
+ Await.result(system.terminate(), 30.seconds)
+ }
+
+ //
---------------------------------------------------------------------------
+ // One configuration: spawn fresh worker, run warmup + timed loop, tear down.
+ //
---------------------------------------------------------------------------
+ private def runConfig(system: ActorSystem, cfg: BenchConfig): BenchResult = {
+ val workerId =
+ VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "bench", "main",
cfg.configIdx)
+ val downstreamId =
+ VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "benchsink",
"main", cfg.configIdx)
+
+ val ctlChannelIn = ChannelIdentity(downstreamId, workerId, isControl =
true)
+ val dataChannelIn = ChannelIdentity(downstreamId, workerId, isControl =
false)
+ val dataChannelOut = ChannelIdentity(workerId, downstreamId, isControl =
false)
+
+ val probe = TestProbe()(system)
+ val sink = system.actorOf(
+ Props(new AutoAckSink(probe.ref)),
+ name = s"bench-sink-${cfg.configIdx}"
+ )
+ val worker = system.actorOf(
+ PythonWorkflowWorker.props(WorkerConfig(workerId)),
+ name = s"bench-worker-${cfg.configIdx}"
+ )
+
+ println(s"\n[bench] config #${cfg.configIdx}: $cfg")
+
+ try {
+ val schema = makeSchema(cfg.schemaWidth)
+ val schemaMap = schema.getAttributes.map(a => a.getName ->
a.getType.name()).toMap
+
+ var ctlSeq: Long = 0L
+ var dataSeq: Long = 0L
+ var msgId: Long = 0L
+
+ def sendCtl(payload: ControlInvocation): Unit = {
+ val fifo = WorkflowFIFOMessage(ctlChannelIn, ctlSeq, payload)
+ ctlSeq += 1
+ worker.tell(NetworkMessage(msgId, fifo), sink)
+ msgId += 1
+ }
+ def sendOnDataChannel(
+ payload:
org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload
+ ): Unit = {
+ val fifo = WorkflowFIFOMessage(dataChannelIn, dataSeq, payload)
+ dataSeq += 1
+ worker.tell(NetworkMessage(msgId, fifo), sink)
+ msgId += 1
+ }
+
+ //
-----------------------------------------------------------------------
+ // Setup control sequence + StartChannel ECM (see Pass 1 for details).
+ //
-----------------------------------------------------------------------
+ val ctx = AsyncRPCContext(sender = downstreamId, receiver = workerId)
+ sendCtl(
+ ControlInvocation(
+ "InitializeExecutor",
+ InitializeExecutorRequest(
+ 1,
+ OpExecWithCode(IdentityPythonCode, "python"),
+ isSource = false
+ ),
+ ctx,
+ 0L
+ )
+ )
+ sendCtl(
+ ControlInvocation(
+ "AssignPort",
+ AssignPortRequest(InputPortId, input = true, schemaMap, Seq.empty,
Seq.empty),
+ ctx,
+ 1L
+ )
+ )
+ sendCtl(
+ ControlInvocation(
+ "AssignPort",
+ AssignPortRequest(OutputPortId, input = false, schemaMap, Seq.empty,
Seq.empty),
+ ctx,
+ 2L
+ )
+ )
+ sendCtl(
+ ControlInvocation(
+ "AddInputChannel",
+ AddInputChannelRequest(dataChannelIn, InputPortId),
+ ctx,
+ 3L
+ )
+ )
+ val outLink = PhysicalLink(
+ fromOpId = VirtualIdentityUtils.getPhysicalOpId(workerId),
+ fromPortId = OutputPortId,
+ toOpId = VirtualIdentityUtils.getPhysicalOpId(downstreamId),
+ toPortId = InputPortId
+ )
+ sendCtl(
+ ControlInvocation(
+ "AddPartitioning",
+ AddPartitioningRequest(
+ outLink,
+ // batch_size = cfg.batchSize keeps the Python-side partitioning
+ // buffer aligned with our send size — one Java DataFrame in maps
+ // to exactly one Python DataFrame out.
+ OneToOnePartitioning(batchSize = cfg.batchSize, channels =
Seq(dataChannelOut))
+ ),
+ ctx,
+ 4L
+ )
+ )
+ sendCtl(ControlInvocation("OpenExecutor", EmptyRequest(), ctx, 5L))
+ sendCtl(ControlInvocation("StartWorker", EmptyRequest(), ctx, 6L))
+
+ waitForReturns(probe, 7, 60.seconds)
+
+ // StartChannel ECM enables data flow on the input channel.
+ val startChannelEcm = EmbeddedControlMessage(
+ id = EmbeddedControlMessageIdentity("StartChannel"),
+ ecmType = EmbeddedControlMessageType.NO_ALIGNMENT,
+ scope = Seq.empty,
+ commandMapping = Map(
+ workerId.name -> ControlInvocation(
+ "StartChannel",
+ EmptyRequest(),
+ AsyncRPCContext(ActorVirtualIdentity(""),
ActorVirtualIdentity("")),
+ -1L
+ )
+ )
+ )
+ sendOnDataChannel(startChannelEcm)
+ // Drain the StartChannel-echo the worker forwards downstream so it
+ // doesn't show up in the data-loop's measurement window.
+ drainNonDataFor(probe, 2.seconds)
+
+ //
-----------------------------------------------------------------------
+ // Build sample tuples once; reuse across all batches in this config.
+ //
-----------------------------------------------------------------------
+ val sampleBatch: Array[Tuple] = buildBatch(schema, cfg.batchSize,
cfg.stringLen)
+ val approxBytesPerBatch: Long =
+ cfg.batchSize.toLong * cfg.schemaWidth.toLong * cfg.stringLen.toLong
+
+ // Warmup — let JIT settle, Python import any lazy modules.
+ var warmedBatches = 0
+ while (warmedBatches < WarmupBatches) {
+ sendOnDataChannel(DataFrame(sampleBatch))
+ if (awaitOneDataFrameEcho(probe, 30.seconds)) warmedBatches += 1
+ }
+
+ //
-----------------------------------------------------------------------
+ // Timed loop — per-batch latency from send to corresponding echo.
+ // Because the Python pipeline is FIFO, sending batch i then awaiting
+ // exactly one DataFrame echo gives latency_i = receive_i - send_i.
+ //
-----------------------------------------------------------------------
+ val latencies = new Array[Long](cfg.numBatches)
+ val totalStart = System.nanoTime()
+ var i = 0
+ while (i < cfg.numBatches) {
+ val t0 = System.nanoTime()
+ sendOnDataChannel(DataFrame(sampleBatch))
+ if (!awaitOneDataFrameEcho(probe, 60.seconds)) {
+ throw new RuntimeException(s"timed out waiting for echo of batch $i")
+ }
+ latencies(i) = System.nanoTime() - t0
+ i += 1
+ }
+ val totalNs = System.nanoTime() - totalStart
+
+ val totalTuples = cfg.numBatches.toLong * cfg.batchSize.toLong
+ val totalBytes = cfg.numBatches.toLong * approxBytesPerBatch
+ val result = BenchResult(
+ cfg,
+ totalWallNs = totalNs,
+ totalTuples = totalTuples,
+ totalBytesApprox = totalBytes,
+ latencyP50Ns = percentile(latencies, 0.50),
+ latencyP95Ns = percentile(latencies, 0.95),
+ latencyP99Ns = percentile(latencies, 0.99)
+ )
+
+ printOne(result)
+ result
+ } finally {
+ worker ! PoisonPill
+ sink ! PoisonPill
+ // Give the worker a moment to tear down its Python subprocess + flight
+ // server cleanly before we move to the next config.
+ Thread.sleep(500)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Helpers
+ //
---------------------------------------------------------------------------
+ private def makeSchema(width: Int): Schema = {
+ val attrs = (0 until width).map(i => new Attribute(s"col$i",
AttributeType.STRING))
+ Schema(attrs.toList)
+ }
+
+ private def buildBatch(schema: Schema, batchSize: Int, stringLen: Int):
Array[Tuple] = {
+ val arr = new Array[Tuple](batchSize)
+ val sampleVal = "x" * stringLen
+ var i = 0
+ val attrs = schema.getAttributes
+ while (i < batchSize) {
+ val b = Tuple.builder(schema)
+ var j = 0
+ while (j < attrs.size) {
+ b.add(attrs(j), sampleVal)
+ j += 1
+ }
+ arr(i) = b.build()
+ i += 1
+ }
+ arr
+ }
+
+ private def waitForReturns(probe: TestProbe, n: Int, timeout:
FiniteDuration): Unit = {
+ val deadline = System.currentTimeMillis() + timeout.toMillis
+ var seen = 0
+ while (seen < n && System.currentTimeMillis() < deadline) {
+ probe.receiveOne(2.seconds) match {
+ case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: ReturnInvocation))
=>
+ seen += 1
+ case _ => // ignore acks + other
+ }
+ }
+ if (seen < n) {
+ throw new RuntimeException(s"only $seen/$n control returns within
$timeout")
+ }
+ }
+
+ private def awaitOneDataFrameEcho(probe: TestProbe, timeout:
FiniteDuration): Boolean = {
+ // Each iteration uses the *remaining* time, not the full timeout — so a
+ // flood of ACK / ECM messages can't extend the overall wait beyond the
+ // caller's deadline by `timeout` × N.
+ val deadline = System.nanoTime() + timeout.toNanos
+ while (true) {
+ val remainingNs = deadline - System.nanoTime()
+ if (remainingNs <= 0) return false
+ probe.receiveOne(remainingNs.nanos) match {
+ case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: DataFrame)) =>
return true
+ case null =>
return false
+ case _ => //
ignore acks, ECM forwards; loop
+ }
+ }
+ false
+ }
+
+ private def drainNonDataFor(probe: TestProbe, dur: FiniteDuration): Unit = {
+ val end = System.currentTimeMillis() + dur.toMillis
+ while (System.currentTimeMillis() < end) {
+ probe.receiveOne(200.millis) match {
+ case null => return
+ case _ => // discard
+ }
+ }
+ }
+
+ private def percentile(values: Array[Long], q: Double): Long = {
+ if (values.isEmpty) 0L
+ else {
+ val sorted = values.sorted
+ val idx = math.min(sorted.length - 1, math.max(0, (sorted.length *
q).toInt))
+ sorted(idx)
+ }
+ }
+
+ private def printOne(r: BenchResult): Unit = {
+ val ms = r.totalWallNs / 1e6
+ println(
+ f" -> total=${ms}%.0fms tuples/s=${r.tuplesPerSec}%,.0f
MB/s=${r.mbPerSec}%.2f " +
+ f"p50=${r.latencyP50Ns / 1000.0}%.1fus p95=${r.latencyP95Ns /
1000.0}%.1fus " +
+ f"p99=${r.latencyP99Ns / 1000.0}%.1fus"
+ )
+ }
+
+ private def writeCsv(results: Seq[BenchResult]): Unit = {
+ val pw = new PrintWriter(Files.newBufferedWriter(CsvOutPath))
+ try {
+ pw.println(
+ "config_idx,batch_size,schema_width,string_len,num_batches," +
+ "total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec," +
+ "lat_p50_us,lat_p95_us,lat_p99_us"
+ )
+ results.foreach { r =>
+ val c = r.cfg
+ pw.println(
+ List(
+ c.configIdx,
+ c.batchSize,
+ c.schemaWidth,
+ c.stringLen,
+ c.numBatches,
+ f"${r.totalWallNs / 1e6}%.2f",
+ r.totalTuples,
+ r.totalBytesApprox,
+ f"${r.tuplesPerSec}%.0f",
+ f"${r.mbPerSec}%.3f",
+ f"${r.latencyP50Ns / 1000.0}%.2f",
+ f"${r.latencyP95Ns / 1000.0}%.2f",
+ f"${r.latencyP99Ns / 1000.0}%.2f"
+ ).mkString(",")
+ )
+ }
+ } finally pw.close()
+ println(s"\n[bench] wrote ${results.size} rows to
${CsvOutPath.toAbsolutePath}")
+ }
+
+ /**
+ * Emit two JSON arrays per github-action-benchmark's customBiggerIsBetter
+ * (throughput) and customSmallerIsBetter (latency) input formats. Each
+ * config produces one throughput entry and three latency entries (p50/p95/
+ * p99). Spec: https://github.com/benchmark-action/github-action-benchmark
+ */
+ private def writeJsonForGitHubActionBenchmark(results: Seq[BenchResult]):
Unit = {
+ def cfgLabel(c: BenchConfig): String =
+ s"bs=${c.batchSize} sw=${c.schemaWidth} sl=${c.stringLen}"
+
+ def jsonEntry(name: String, unit: String, value: Double): String =
+ s""" { "name": ${quoteJson(name)}, "unit": ${quoteJson(unit)}, "value":
$value }"""
+
+ val throughputEntries = results.map { r =>
+ jsonEntry(s"throughput / ${cfgLabel(r.cfg)}", "tuples/sec",
r.tuplesPerSec)
+ }
+ val latencyEntries = results.flatMap { r =>
+ Seq(
+ jsonEntry(s"latency p50 / ${cfgLabel(r.cfg)}", "us", r.latencyP50Ns /
1000.0),
+ jsonEntry(s"latency p95 / ${cfgLabel(r.cfg)}", "us", r.latencyP95Ns /
1000.0),
+ jsonEntry(s"latency p99 / ${cfgLabel(r.cfg)}", "us", r.latencyP99Ns /
1000.0)
+ )
+ }
+
+ writeJsonArray(ThroughputJsonPath, throughputEntries)
+ writeJsonArray(LatencyJsonPath, latencyEntries)
+ println(
+ s"[bench] wrote ${results.size} throughput entries to
${ThroughputJsonPath.toAbsolutePath}"
+ )
+ println(
+ s"[bench] wrote ${latencyEntries.size} latency entries to
${LatencyJsonPath.toAbsolutePath}"
+ )
+ }
+
+ private def writeJsonArray(path: java.nio.file.Path, entries: Seq[String]):
Unit = {
+ val pw = new PrintWriter(Files.newBufferedWriter(path))
+ try {
+ pw.println("[")
+ pw.println(entries.mkString(",\n"))
+ pw.println("]")
+ } finally pw.close()
+ }
+
+ private def quoteJson(s: String): String =
+ "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\""
+
+ private def printSummary(results: Seq[BenchResult]): Unit = {
+ println("\n[bench] === summary ===")
+ println(
+ f"${"#"}%3s ${"bs"}%5s ${"sw"}%3s ${"sl"}%4s ${"tuples/s"}%10s
${"MB/s"}%7s " +
+ f"${"p50us"}%8s ${"p99us"}%8s"
+ )
+ results.foreach { r =>
+ println(
+ f"${r.cfg.configIdx}%3d ${r.cfg.batchSize}%5d
${r.cfg.schemaWidth}%3d ${r.cfg.stringLen}%4d " +
+ f"${r.tuplesPerSec}%,10.0f ${r.mbPerSec}%7.2f " +
+ f"${r.latencyP50Ns / 1000.0}%8.1f ${r.latencyP99Ns / 1000.0}%8.1f"
+ )
+ }
+ }
+}
diff --git a/bin/run-benchmarks.sh b/bin/run-benchmarks.sh
new file mode 100755
index 0000000000..c3f843c9a3
--- /dev/null
+++ b/bin/run-benchmarks.sh
@@ -0,0 +1,59 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Single entry-point for all Texera benchmarks. CI calls this script
+# verbatim — it does NOT reference individual benchmark main classes.
+# Adding a new benchmark (e.g., a JMH suite) means appending one block
+# to this script; no CI workflow change.
+#
+# Output convention: every benchmark writes to bench-results/ with a
+# self-describing filename suffix that matches the github-action-benchmark
+# `tool` parameter expected by the publish step in build.yml:
+# bench-results/<bench>-throughput.json → tool: customBiggerIsBetter
+# bench-results/<bench>-latency.json → tool: customSmallerIsBetter
+# bench-results/<bench>-jmh.json → tool: jmh
+# CSV / log / debug files may live alongside; the publish matrix only
+# cares about the *.json files declared in build.yml.
+#
+# Env vars honored:
+# BENCH_NUM_BATCHES — passes through to the e2e bench (default 100).
+# Lower for fast PR runs; higher for stable nightlies.
+# UDF_PYTHON_PATH — Python executable for the spawned worker subprocess.
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
+cd "$REPO_ROOT"
+
+mkdir -p bench-results
+
+echo "=== run-benchmarks: arrow-flight-e2e ==="
+sbt --error \
+ "WorkflowExecutionService/Test/runMain
org.apache.texera.amber.bench.ArrowFlightActorBench"
+
+# Future benchmarks: add new blocks below. Each block should self-contain
+# the run command and ensure its outputs land in bench-results/. Example
+# for a future JMH suite:
+# echo "=== run-benchmarks: arrow-utils-jmh ==="
+# sbt "WorkflowExecutionService/Jmh/run -rf json -rff
$REPO_ROOT/bench-results/arrow-utils-jmh.json"
+
+echo
+echo "=== bench artifacts ==="
+ls -la bench-results/