This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 957dddc  perf: setup tpch benchmark infra (#538)
957dddc is described below

commit 957dddcea23a7e8ec564ffb2ab0d90d420d4e8d9
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Mar 9 10:29:59 2026 -0500

    perf: setup tpch benchmark infra (#538)
---
 .gitignore => .dockerignore                        |   26 +-
 .gitignore                                         |    2 +
 .licenserc.yaml                                    |    1 +
 Cargo.toml                                         |    1 +
 Makefile                                           |   40 +-
 .gitignore => benchmark/tpch/Cargo.toml            |   44 +-
 benchmark/tpch/README.md                           |   63 +
 .gitignore => benchmark/tpch/config/sf1.yaml       |   42 +-
 .gitignore => benchmark/tpch/config/sf10.yaml      |   42 +-
 .gitignore => benchmark/tpch/config/sf100.yaml     |   42 +-
 .gitignore => benchmark/tpch/config/sf1000.yaml    |   42 +-
 benchmark/tpch/config/tables.yaml                  |   53 +
 benchmark/tpch/infra/Dockerfile                    |   56 +
 benchmark/tpch/infra/spark/bench.py                |  118 ++
 .../tpch/infra/spark/hudi-defaults.conf            |   25 +-
 .../tpch/infra/spark/log4j2.properties             |   27 +-
 .../tpch/infra/spark/spark-defaults.conf           |   26 +-
 benchmark/tpch/queries/q1.sql                      |   23 +
 benchmark/tpch/queries/q10.sql                     |   33 +
 benchmark/tpch/queries/q11.sql                     |   29 +
 benchmark/tpch/queries/q12.sql                     |   30 +
 benchmark/tpch/queries/q13.sql                     |   22 +
 benchmark/tpch/queries/q14.sql                     |   15 +
 benchmark/tpch/queries/q15.sql                     |   33 +
 benchmark/tpch/queries/q16.sql                     |   32 +
 benchmark/tpch/queries/q17.sql                     |   19 +
 benchmark/tpch/queries/q18.sql                     |   34 +
 benchmark/tpch/queries/q19.sql                     |   37 +
 benchmark/tpch/queries/q2.sql                      |   45 +
 benchmark/tpch/queries/q20.sql                     |   39 +
 benchmark/tpch/queries/q21.sql                     |   41 +
 benchmark/tpch/queries/q22.sql                     |   39 +
 benchmark/tpch/queries/q3.sql                      |   24 +
 benchmark/tpch/queries/q4.sql                      |   23 +
 benchmark/tpch/queries/q5.sql                      |   26 +
 benchmark/tpch/queries/q6.sql                      |   11 +
 benchmark/tpch/queries/q7.sql                      |   41 +
 benchmark/tpch/queries/q8.sql                      |   39 +
 benchmark/tpch/queries/q9.sql                      |   34 +
 benchmark/tpch/run.sh                              |  458 +++++++
 benchmark/tpch/src/config.rs                       |  248 ++++
 benchmark/tpch/src/datagen.rs                      |  192 +++
 benchmark/tpch/src/main.rs                         | 1268 ++++++++++++++++++++
 43 files changed, 3310 insertions(+), 175 deletions(-)

diff --git a/.gitignore b/.dockerignore
similarity index 81%
copy from .gitignore
copy to .dockerignore
index b6e2fa9..70e0351 100644
--- a/.gitignore
+++ b/.dockerignore
@@ -15,24 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
-
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+target
+.git
+benchmark/tpch/data
+benchmark/tpch/results
+python/.venv
diff --git a/.gitignore b/.gitignore
index b6e2fa9..7373a8d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,6 +18,8 @@
 /Cargo.lock
 /target
 **/target
+**/data
+**/results
 
 /.idea
 .vscode
diff --git a/.licenserc.yaml b/.licenserc.yaml
index e79c793..57569a6 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -26,6 +26,7 @@ header:
     - '**/data/**'
     - '.github/PULL_REQUEST_TEMPLATE.md'
     - 'crates/core/schemas/**'
+    - 'benchmark/tpch/queries/**'
 
   comment: on-failure
 
diff --git a/Cargo.toml b/Cargo.toml
index be59e70..b9badbb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,6 +20,7 @@ members = [
     "crates/*",
     "cpp",
     "python",
+    "benchmark/tpch",
 ]
 resolver = "2"
 
diff --git a/Makefile b/Makefile
index 23198fd..3979066 100644
--- a/Makefile
+++ b/Makefile
@@ -49,7 +49,8 @@ COV_OUTPUT_DIR := ./cov-reports
 COV_THRESHOLD ?= 60
 COV_EXCLUDE := \
        --exclude-files 'cpp/src/*' \
-       --exclude-files 'crates/core/src/avro_to_arrow/*'
+       --exclude-files 'crates/core/src/avro_to_arrow/*' \
+       --exclude-files 'benchmark/*'
 TARPAULIN_COMMON := --engine llvm --no-dead-code --no-fail-fast \
        --all-features --workspace $(COV_EXCLUDE) --skip-clean
 
@@ -158,3 +159,40 @@ coverage-check: ## Fail if coverage is below threshold 
(COV_THRESHOLD=60)
 .PHONY: clean-coverage
 clean-coverage: ## Remove coverage reports
        rm -rf $(COV_OUTPUT_DIR)
+
+# =============================================================================
+# TPC-H Benchmark
+# =============================================================================
+SF ?= 0.001
+ENGINE ?= datafusion
+FORMAT ?= hudi
+MODE ?= native
+QUERIES ?=
+TPCH_DIR := benchmark/tpch
+TPCH_DATA_DIR := $(TPCH_DIR)/data
+TPCH_RESULTS_DIR := $(TPCH_DIR)/results
+
+.PHONY: tpch-generate
+tpch-generate: ## Generate TPC-H parquet tables (SF=0.001)
+       $(info --- Generate TPC-H parquet tables at SF=$(SF) ---)
+       $(TPCH_DIR)/run.sh generate --scale-factor $(SF)
+
+.PHONY: tpch-create-tables
+tpch-create-tables: ## Create Hudi COW tables from parquet (SF=0.001, requires 
Docker)
+       $(info --- Create Hudi tables at SF=$(SF) ---)
+       $(TPCH_DIR)/run.sh create-tables --scale-factor $(SF)
+
+.PHONY: bench-tpch
+bench-tpch: ## Run TPC-H benchmark (ENGINE=datafusion|spark SF=0.001 
MODE=native|docker QUERIES=1,3,6)
+       $(info --- Benchmark at SF=$(SF) MODE=$(MODE) ---)
+ifeq ($(ENGINE),spark)
+       MODE=$(MODE) $(TPCH_DIR)/run.sh bench-spark --scale-factor $(SF) 
--format $(FORMAT) $(if $(QUERIES),--queries $(QUERIES)) --output-dir 
$(TPCH_RESULTS_DIR)
+else ifeq ($(ENGINE),datafusion)
+       MODE=$(MODE) $(TPCH_DIR)/run.sh bench-datafusion --scale-factor $(SF) 
--format $(FORMAT) $(if $(QUERIES),--queries $(QUERIES)) --output-dir 
$(TPCH_RESULTS_DIR)
+else
+       $(error Unknown ENGINE=$(ENGINE). Use datafusion or spark)
+endif
+
+.PHONY: tpch-compare
+tpch-compare: ## Compare persisted TPC-H benchmark results 
(ENGINES=datafusion,spark SF=0.001)
+       $(TPCH_DIR)/run.sh compare --scale-factor $(SF) --engines $(ENGINES) 
--format $(FORMAT)
diff --git a/.gitignore b/benchmark/tpch/Cargo.toml
similarity index 54%
copy from .gitignore
copy to benchmark/tpch/Cargo.toml
index b6e2fa9..218b899 100644
--- a/.gitignore
+++ b/benchmark/tpch/Cargo.toml
@@ -15,24 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
+[package]
+name = "tpch"
+version.workspace = true
+edition.workspace = true
 
-/.idea
-.vscode
+[[bin]]
+name = "tpch"
+path = "src/main.rs"
 
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+[dependencies]
+arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-cast = { workspace = true }
+clap = { version = "4", features = ["derive"] }
+env_logger = "0.11"
+comfy-table = "7"
+datafusion = { workspace = true }
+hudi = { path = "../../crates/hudi", features = ["datafusion"] }
+object_store = { workspace = true }
+parquet = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+tokio = { workspace = true }
+tpchgen = "2"
+tpchgen-arrow = "2"
+serde_yaml = "0.9"
+url = { workspace = true }
diff --git a/benchmark/tpch/README.md b/benchmark/tpch/README.md
new file mode 100644
index 0000000..ae87196
--- /dev/null
+++ b/benchmark/tpch/README.md
@@ -0,0 +1,63 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# TPC-H Benchmark
+
+## Prerequisites
+
+- Rust toolchain
+- Docker (for Hudi table creation)
+
+## Quick Start
+
+```bash
+# 1. Generate parquet data
+make tpch-generate SF=1
+
+# 2. Create Hudi COW tables from parquet (requires Docker)
+make tpch-create-tables SF=1
+
+# 3. Benchmark (results are automatically persisted)
+make bench-tpch ENGINE=datafusion SF=1
+make bench-tpch ENGINE=spark SF=1
+
+# 4. Compare results
+make tpch-compare ENGINES=datafusion,spark SF=1
+```
+
+## Options
+
+| Variable  | Values                                                  | 
Default      |
+|-----------|---------------------------------------------------------|--------------|
+| `ENGINE`  | `datafusion`, `spark`                                   | 
`datafusion` |
+| `SF`      | TPC-H scale factor                                      | 
`0.001`      |
+| `QUERIES` | Comma-separated query numbers                           | all 22 
      |
+| `MODE`    | `native`, `docker`                                      | 
`native`     |
+| `ENGINES` | Comma-separated engine names (for `tpch-compare`)       |        
      |
+
+## Examples
+
+```bash
+# Run only Q1, Q6, Q17
+make bench-tpch QUERIES=1,6,17 SF=10
+
+# Run inside Docker (same apache/spark:3.5.8 base image for fair comparison)
+make bench-tpch ENGINE=datafusion SF=1 MODE=docker
+make bench-tpch ENGINE=spark SF=1 MODE=docker
+```
diff --git a/.gitignore b/benchmark/tpch/config/sf1.yaml
similarity index 57%
copy from .gitignore
copy to benchmark/tpch/config/sf1.yaml
index b6e2fa9..32fc71f 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf1.yaml
@@ -15,24 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 1 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
 
-/.idea
-.vscode
+shuffle_parallelism:
+  customer: 1
+  lineitem: 1
+  nation: 1
+  orders: 1
+  part: 1
+  partsupp: 1
+  region: 1
+  supplier: 1
 
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+  spark_conf:
+    spark.driver.memory: 3g
 
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+  warmup: 1
+  iterations: 2
+  memory_limit: 3g
+  spark_conf:
+    spark.driver.memory: 3g
+    spark.sql.shuffle.partitions: "4"
+    spark.sql.autoBroadcastJoinThreshold: "33554432"
diff --git a/.gitignore b/benchmark/tpch/config/sf10.yaml
similarity index 57%
copy from .gitignore
copy to benchmark/tpch/config/sf10.yaml
index b6e2fa9..75d75d4 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf10.yaml
@@ -15,24 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 10 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
 
-/.idea
-.vscode
+shuffle_parallelism:
+  customer: 1
+  lineitem: 8
+  nation: 1
+  orders: 2
+  part: 1
+  partsupp: 2
+  region: 1
+  supplier: 1
 
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+  spark_conf:
+    spark.driver.memory: 12g
 
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+  warmup: 1
+  iterations: 3
+  memory_limit: 12g
+  spark_conf:
+    spark.driver.memory: 12g
+    spark.sql.shuffle.partitions: "16"
+    spark.sql.autoBroadcastJoinThreshold: "67108864"
diff --git a/.gitignore b/benchmark/tpch/config/sf100.yaml
similarity index 57%
copy from .gitignore
copy to benchmark/tpch/config/sf100.yaml
index b6e2fa9..6c243aa 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf100.yaml
@@ -15,24 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 100 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
 
-/.idea
-.vscode
+shuffle_parallelism:
+  customer: 3
+  lineitem: 73
+  nation: 1
+  orders: 17
+  part: 3
+  partsupp: 12
+  region: 1
+  supplier: 1
 
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+  spark_conf:
+    spark.driver.memory: 60g
 
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+  warmup: 2
+  iterations: 3
+  memory_limit: 60g
+  spark_conf:
+    spark.driver.memory: 60g
+    spark.sql.shuffle.partitions: "200"
+    spark.sql.autoBroadcastJoinThreshold: "134217728"
diff --git a/.gitignore b/benchmark/tpch/config/sf1000.yaml
similarity index 56%
copy from .gitignore
copy to benchmark/tpch/config/sf1000.yaml
index b6e2fa9..d600e55 100644
--- a/.gitignore
+++ b/benchmark/tpch/config/sf1000.yaml
@@ -15,24 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
+# TPC-H scale factor 1000 configuration.
+# Per-scale-factor shuffle parallelism and Spark/Hudi tuning parameters.
 
-/.idea
-.vscode
+shuffle_parallelism:
+  customer: 12
+  lineitem: 292
+  nation: 1
+  orders: 68
+  part: 12
+  partsupp: 48
+  region: 1
+  supplier: 4
 
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
+# Spark configs for 'create-tables' command
+create_tables:
+  spark_conf:
+    spark.driver.memory: 120g
 
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+# Benchmark settings (shared across DataFusion and Spark)
+bench:
+  warmup: 2
+  iterations: 3
+  memory_limit: 120g
+  spark_conf:
+    spark.driver.memory: 120g
+    spark.sql.shuffle.partitions: "1600"
+    spark.sql.autoBroadcastJoinThreshold: "536870912"
diff --git a/benchmark/tpch/config/tables.yaml 
b/benchmark/tpch/config/tables.yaml
new file mode 100644
index 0000000..cefb465
--- /dev/null
+++ b/benchmark/tpch/config/tables.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Common TPC-H table definitions shared across all scale factors.
+# Per-scale-factor configs (sf*.yaml) override shuffle_parallelism.
+
+tables:
+  customer:
+    primary_key: c_custkey
+    pre_combine_field: c_custkey
+    record_size_estimate: 87
+  lineitem:
+    primary_key: l_orderkey,l_linenumber
+    pre_combine_field: l_shipdate
+    record_size_estimate: 36
+  nation:
+    primary_key: n_nationkey
+    pre_combine_field: n_nationkey
+    record_size_estimate: 160
+  orders:
+    primary_key: o_orderkey
+    pre_combine_field: o_orderdate
+    record_size_estimate: 35
+  part:
+    primary_key: p_partkey
+    pre_combine_field: p_partkey
+    record_size_estimate: 32
+  partsupp:
+    primary_key: ps_partkey,ps_suppkey
+    pre_combine_field: ps_partkey
+    record_size_estimate: 48
+  region:
+    primary_key: r_regionkey
+    pre_combine_field: r_regionkey
+    record_size_estimate: 800
+  supplier:
+    primary_key: s_suppkey
+    pre_combine_field: s_suppkey
+    record_size_estimate: 88
diff --git a/benchmark/tpch/infra/Dockerfile b/benchmark/tpch/infra/Dockerfile
new file mode 100644
index 0000000..9875293
--- /dev/null
+++ b/benchmark/tpch/infra/Dockerfile
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ---------- Stage 1: build the tpch binary ----------
+FROM rust:1.88-bookworm AS builder
+
+WORKDIR /build
+
+# Copy the full workspace (Cargo.toml, crates, benchmark, etc.)
+COPY Cargo.toml Cargo.lock ./
+COPY crates crates
+COPY benchmark/tpch benchmark/tpch
+# Stub out workspace members that aren't needed for the tpch build
+RUN mkdir -p cpp/src && echo "fn main() {}" > cpp/src/main.rs \
+ && printf '[package]\nname = "hudi-cpp"\nversion = "0.0.0"\nedition = 
"2024"\n' > cpp/Cargo.toml \
+ && mkdir -p python/src && touch python/src/lib.rs \
+ && printf '[package]\nname = "hudi-python"\nversion = "0.0.0"\nedition = 
"2024"\n' > python/Cargo.toml
+
+RUN cargo build -p tpch --release \
+ && strip target/release/tpch
+
+# ---------- Stage 2: runtime with Spark + Hudi + tpch ----------
+FROM apache/spark:3.5.8
+
+USER root
+
+# Download Hudi Spark bundle jar
+RUN cd /opt/spark/jars && \
+  wget -q \
+    
https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.5-bundle_2.12/1.1.1/hudi-spark3.5-bundle_2.12-1.1.1.jar
+
+# Bake in Spark and Hudi defaults, and logging config
+COPY benchmark/tpch/infra/spark/spark-defaults.conf 
/opt/spark/conf/spark-defaults.conf
+COPY benchmark/tpch/infra/spark/log4j2.properties 
/opt/spark/conf/log4j2.properties
+COPY benchmark/tpch/infra/spark/hudi-defaults.conf 
/etc/hudi/conf/hudi-defaults.conf
+
+# Install tpch binary
+COPY --from=builder /build/target/release/tpch /usr/local/bin/tpch
+
+USER spark
+
+WORKDIR /opt/spark/work-dir
diff --git a/benchmark/tpch/infra/spark/bench.py 
b/benchmark/tpch/infra/spark/bench.py
new file mode 100644
index 0000000..e473c18
--- /dev/null
+++ b/benchmark/tpch/infra/spark/bench.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""TPC-H benchmark runner for Spark SQL with Hudi tables.
+
+Measures wall-clock time around spark.sql().collect() for each query iteration,
+mirroring how DataFusion benchmarks are measured.
+"""
+
+import argparse
+import json
+import os
+import time
+
+from pyspark.sql import SparkSession
+
+TPCH_TABLES = [
+    "customer", "lineitem", "nation", "orders",
+    "part", "partsupp", "region", "supplier",
+]
+
+
+def load_query(query_dir, query_num, scale_factor):
+    path = os.path.join(query_dir, f"q{query_num}.sql")
+    with open(path) as f:
+        sql = f.read()
+    q11_fraction = f"{0.0001 / scale_factor:.10f}"
+    return sql.replace("${Q11_FRACTION}", q11_fraction)
+
+
+def main():
+    parser = argparse.ArgumentParser(description="TPC-H Spark SQL benchmark")
+    parser.add_argument("--hudi-base", default=None)
+    parser.add_argument("--parquet-base", default=None)
+    parser.add_argument("--query-dir", required=True)
+    parser.add_argument("--scale-factor", type=float, default=1.0)
+    parser.add_argument("--queries", default=None, help="Comma-separated query 
numbers")
+    parser.add_argument("--warmup", type=int, required=True)
+    parser.add_argument("--iterations", type=int, required=True)
+    parser.add_argument("--output", required=True, help="Output file for JSON 
results")
+    args = parser.parse_args()
+
+    if not args.hudi_base and not args.parquet_base:
+        parser.error("at least one of --hudi-base or --parquet-base is 
required")
+
+    query_nums = list(range(1, 23))
+    if args.queries:
+        query_nums = [int(q.strip()) for q in args.queries.split(",")]
+
+    total_runs = args.warmup + args.iterations
+
+    print("Initializing Spark session...", flush=True)
+    spark = SparkSession.builder.getOrCreate()
+
+    # Register tables
+    if args.hudi_base:
+        for table in TPCH_TABLES:
+            print(f"  Registering Hudi table: {table}", flush=True)
+            spark.sql(
+                f"CREATE TABLE {table} USING hudi LOCATION 
'{args.hudi_base}/{table}'"
+            )
+    elif args.parquet_base:
+        for table in TPCH_TABLES:
+            print(f"  Registering Parquet table: {table}", flush=True)
+            
spark.read.parquet(f"{args.parquet_base}/{table}").createOrReplaceTempView(table)
+
+    print(
+        f"Warmup: {args.warmup} iteration(s), Measured: {args.iterations} 
iteration(s)",
+        flush=True,
+    )
+
+    result_file = open(args.output, "w")
+
+    for qn in query_nums:
+        sql = load_query(args.query_dir, qn, args.scale_factor)
+        statements = [s.strip() for s in sql.split(";") if s.strip()]
+
+        for i in range(total_runs):
+            is_warmup = i < args.warmup
+            if is_warmup:
+                label = f"warmup {i + 1}/{args.warmup}"
+            else:
+                label = f"iter {i - args.warmup + 1}/{args.iterations}"
+            print(f"  Q{qn:02d} {label}...", end="", flush=True)
+
+            start = time.time()
+            for stmt in statements:
+                spark.sql(stmt).collect()
+            elapsed_ms = (time.time() - start) * 1000.0
+
+            print(f" {elapsed_ms:.1f}ms", flush=True)
+
+            if not is_warmup:
+                result_file.write(json.dumps({"query": qn, "elapsed_ms": 
elapsed_ms}) + "\n")
+                result_file.flush()
+
+    result_file.close()
+    spark.stop()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/.gitignore b/benchmark/tpch/infra/spark/hudi-defaults.conf
similarity index 81%
copy from .gitignore
copy to benchmark/tpch/infra/spark/hudi-defaults.conf
index b6e2fa9..9d42b24 100644
--- a/.gitignore
+++ b/benchmark/tpch/infra/spark/hudi-defaults.conf
@@ -15,24 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
-
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+hoodie.bulkinsert.sort.mode=GLOBAL_SORT
+hoodie.parquet.max.file.size=1073741824
+hoodie.parquet.small.file.limit=0
+hoodie.bulkinsert.shuffle.parallelism=1
diff --git a/.gitignore b/benchmark/tpch/infra/spark/log4j2.properties
similarity index 73%
copy from .gitignore
copy to benchmark/tpch/infra/spark/log4j2.properties
index b6e2fa9..f3de794 100644
--- a/.gitignore
+++ b/benchmark/tpch/infra/spark/log4j2.properties
@@ -15,24 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
+rootLogger.level = error
+rootLogger.appenderRef.stdout.ref = console
 
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+appender.console.type = Console
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %-5p %c{1}: %m%n
diff --git a/.gitignore b/benchmark/tpch/infra/spark/spark-defaults.conf
similarity index 67%
copy from .gitignore
copy to benchmark/tpch/infra/spark/spark-defaults.conf
index b6e2fa9..ceac52c 100644
--- a/.gitignore
+++ b/benchmark/tpch/infra/spark/spark-defaults.conf
@@ -15,24 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-/Cargo.lock
-/target
-**/target
-
-/.idea
-.vscode
-
-# python
-.venv
-venv
-**/.python-version
-__pycache__
-uv.lock
-
-# macOS
-**/.DS_Store
-
-# coverage files
-*.profraw
-cobertura.xml
-/cov-reports/
+spark.serializer                            
org.apache.spark.serializer.KryoSerializer
+spark.sql.catalog.spark_catalog             
org.apache.spark.sql.hudi.catalog.HoodieCatalog
+spark.sql.extensions                        
org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+spark.sql.files.maxPartitionBytes           1073741824
+spark.sql.parquet.aggregatePushdown         true
diff --git a/benchmark/tpch/queries/q1.sql b/benchmark/tpch/queries/q1.sql
new file mode 100644
index 0000000..0dc4c3e
--- /dev/null
+++ b/benchmark/tpch/queries/q1.sql
@@ -0,0 +1,23 @@
+-- SQLBench-H query 1 derived from TPC-H query 1 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       l_returnflag,
+       l_linestatus,
+       sum(l_quantity) as sum_qty,
+       sum(l_extendedprice) as sum_base_price,
+       sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+       sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+       avg(l_quantity) as avg_qty,
+       avg(l_extendedprice) as avg_price,
+       avg(l_discount) as avg_disc,
+       count(*) as count_order
+from
+       lineitem
+where
+       l_shipdate <= date '1998-12-01' - interval '68 days'
+group by
+       l_returnflag,
+       l_linestatus
+order by
+       l_returnflag,
+       l_linestatus;
diff --git a/benchmark/tpch/queries/q10.sql b/benchmark/tpch/queries/q10.sql
new file mode 100644
index 0000000..576338f
--- /dev/null
+++ b/benchmark/tpch/queries/q10.sql
@@ -0,0 +1,33 @@
+-- SQLBench-H query 10 derived from TPC-H query 10 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       c_custkey,
+       c_name,
+       sum(l_extendedprice * (1 - l_discount)) as revenue,
+       c_acctbal,
+       n_name,
+       c_address,
+       c_phone,
+       c_comment
+from
+       customer,
+       orders,
+       lineitem,
+       nation
+where
+       c_custkey = o_custkey
+       and l_orderkey = o_orderkey
+       and o_orderdate >= date '1993-07-01'
+       and o_orderdate < date '1993-07-01' + interval '3' month
+       and l_returnflag = 'R'
+       and c_nationkey = n_nationkey
+group by
+       c_custkey,
+       c_name,
+       c_acctbal,
+       c_phone,
+       n_name,
+       c_address,
+       c_comment
+order by
+       revenue desc limit 20;
diff --git a/benchmark/tpch/queries/q11.sql b/benchmark/tpch/queries/q11.sql
new file mode 100644
index 0000000..5e31dad
--- /dev/null
+++ b/benchmark/tpch/queries/q11.sql
@@ -0,0 +1,29 @@
+-- SQLBench-H query 11 derived from TPC-H query 11 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       ps_partkey,
+       sum(ps_supplycost * ps_availqty) as value
+from
+       partsupp,
+       supplier,
+       nation
+where
+       ps_suppkey = s_suppkey
+       and s_nationkey = n_nationkey
+       and n_name = 'ALGERIA'
+group by
+       ps_partkey having
+               sum(ps_supplycost * ps_availqty) > (
+                       select
+                               sum(ps_supplycost * ps_availqty) * 
${Q11_FRACTION}
+                       from
+                               partsupp,
+                               supplier,
+                               nation
+                       where
+                               ps_suppkey = s_suppkey
+                               and s_nationkey = n_nationkey
+                               and n_name = 'ALGERIA'
+               )
+order by
+       value desc;
diff --git a/benchmark/tpch/queries/q12.sql b/benchmark/tpch/queries/q12.sql
new file mode 100644
index 0000000..4ab4ea6
--- /dev/null
+++ b/benchmark/tpch/queries/q12.sql
@@ -0,0 +1,30 @@
+-- SQLBench-H query 12 derived from TPC-H query 12 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       l_shipmode,
+       sum(case
+               when o_orderpriority = '1-URGENT'
+                       or o_orderpriority = '2-HIGH'
+                       then 1
+               else 0
+       end) as high_line_count,
+       sum(case
+               when o_orderpriority <> '1-URGENT'
+                       and o_orderpriority <> '2-HIGH'
+                       then 1
+               else 0
+       end) as low_line_count
+from
+       orders,
+       lineitem
+where
+       o_orderkey = l_orderkey
+       and l_shipmode in ('FOB', 'SHIP')
+       and l_commitdate < l_receiptdate
+       and l_shipdate < l_commitdate
+       and l_receiptdate >= date '1995-01-01'
+       and l_receiptdate < date '1995-01-01' + interval '1' year
+group by
+       l_shipmode
+order by
+       l_shipmode;
diff --git a/benchmark/tpch/queries/q13.sql b/benchmark/tpch/queries/q13.sql
new file mode 100644
index 0000000..301e35d
--- /dev/null
+++ b/benchmark/tpch/queries/q13.sql
@@ -0,0 +1,22 @@
+-- SQLBench-H query 13 derived from TPC-H query 13 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       c_count,
+       count(*) as custdist
+from
+       (
+               select
+                       c_custkey,
+                       count(o_orderkey)
+               from
+                       customer left outer join orders on
+                               c_custkey = o_custkey
+                               and o_comment not like '%express%requests%'
+               group by
+                       c_custkey
+       ) as c_orders (c_custkey, c_count)
+group by
+       c_count
+order by
+       custdist desc,
+       c_count desc;
diff --git a/benchmark/tpch/queries/q14.sql b/benchmark/tpch/queries/q14.sql
new file mode 100644
index 0000000..6040ac7
--- /dev/null
+++ b/benchmark/tpch/queries/q14.sql
@@ -0,0 +1,15 @@
+-- SQLBench-H query 14 derived from TPC-H query 14 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       100.00 * sum(case
+               when p_type like 'PROMO%'
+                       then l_extendedprice * (1 - l_discount)
+               else 0
+       end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
+from
+       lineitem,
+       part
+where
+       l_partkey = p_partkey
+       and l_shipdate >= date '1995-02-01'
+       and l_shipdate < date '1995-02-01' + interval '1' month;
diff --git a/benchmark/tpch/queries/q15.sql b/benchmark/tpch/queries/q15.sql
new file mode 100644
index 0000000..c656dc3
--- /dev/null
+++ b/benchmark/tpch/queries/q15.sql
@@ -0,0 +1,33 @@
+-- SQLBench-H query 15 derived from TPC-H query 15 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+create view revenue0 (supplier_no, total_revenue) as
+       select
+               l_suppkey,
+               sum(l_extendedprice * (1 - l_discount)) as total_revenue
+       from
+               lineitem
+       where
+               l_shipdate >= date '1996-08-01'
+               and l_shipdate < date '1996-08-01' + interval '3' month
+       group by
+               l_suppkey;
+select
+       s_suppkey,
+       s_name,
+       s_address,
+       s_phone,
+       total_revenue
+from
+       supplier,
+       revenue0
+where
+       s_suppkey = supplier_no
+       and total_revenue = (
+               select
+                       max(total_revenue)
+               from
+                       revenue0
+       )
+order by
+       s_suppkey;
+drop view revenue0;
diff --git a/benchmark/tpch/queries/q16.sql b/benchmark/tpch/queries/q16.sql
new file mode 100644
index 0000000..7fdf365
--- /dev/null
+++ b/benchmark/tpch/queries/q16.sql
@@ -0,0 +1,32 @@
+-- SQLBench-H query 16 derived from TPC-H query 16 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       p_brand,
+       p_type,
+       p_size,
+       count(distinct ps_suppkey) as supplier_cnt
+from
+       partsupp,
+       part
+where
+       p_partkey = ps_partkey
+       and p_brand <> 'Brand#14'
+       and p_type not like 'SMALL PLATED%'
+       and p_size in (14, 6, 5, 31, 49, 15, 41, 47)
+       and ps_suppkey not in (
+               select
+                       s_suppkey
+               from
+                       supplier
+               where
+                       s_comment like '%Customer%Complaints%'
+       )
+group by
+       p_brand,
+       p_type,
+       p_size
+order by
+       supplier_cnt desc,
+       p_brand,
+       p_type,
+       p_size;
diff --git a/benchmark/tpch/queries/q17.sql b/benchmark/tpch/queries/q17.sql
new file mode 100644
index 0000000..ffa0f15
--- /dev/null
+++ b/benchmark/tpch/queries/q17.sql
@@ -0,0 +1,19 @@
+-- SQLBench-H query 17 derived from TPC-H query 17 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       sum(l_extendedprice) / 7.0 as avg_yearly
+from
+       lineitem,
+       part
+where
+       p_partkey = l_partkey
+       and p_brand = 'Brand#42'
+       and p_container = 'LG BAG'
+       and l_quantity < (
+               select
+                       0.2 * avg(l_quantity)
+               from
+                       lineitem
+               where
+                       l_partkey = p_partkey
+       );
diff --git a/benchmark/tpch/queries/q18.sql b/benchmark/tpch/queries/q18.sql
new file mode 100644
index 0000000..f4ab194
--- /dev/null
+++ b/benchmark/tpch/queries/q18.sql
@@ -0,0 +1,34 @@
+-- SQLBench-H query 18 derived from TPC-H query 18 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       c_name,
+       c_custkey,
+       o_orderkey,
+       o_orderdate,
+       o_totalprice,
+       sum(l_quantity)
+from
+       customer,
+       orders,
+       lineitem
+where
+       o_orderkey in (
+               select
+                       l_orderkey
+               from
+                       lineitem
+               group by
+                       l_orderkey having
+                               sum(l_quantity) > 313
+       )
+       and c_custkey = o_custkey
+       and o_orderkey = l_orderkey
+group by
+       c_name,
+       c_custkey,
+       o_orderkey,
+       o_orderdate,
+       o_totalprice
+order by
+       o_totalprice desc,
+       o_orderdate limit 100;
diff --git a/benchmark/tpch/queries/q19.sql b/benchmark/tpch/queries/q19.sql
new file mode 100644
index 0000000..ad5fb7d
--- /dev/null
+++ b/benchmark/tpch/queries/q19.sql
@@ -0,0 +1,37 @@
+-- SQLBench-H query 19 derived from TPC-H query 19 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       sum(l_extendedprice* (1 - l_discount)) as revenue
+from
+       lineitem,
+       part
+where
+       (
+               p_partkey = l_partkey
+               and p_brand = 'Brand#21'
+               and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
+               and l_quantity >= 8 and l_quantity <= 8 + 10
+               and p_size between 1 and 5
+               and l_shipmode in ('AIR', 'AIR REG')
+               and l_shipinstruct = 'DELIVER IN PERSON'
+       )
+       or
+       (
+               p_partkey = l_partkey
+               and p_brand = 'Brand#13'
+               and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
+               and l_quantity >= 20 and l_quantity <= 20 + 10
+               and p_size between 1 and 10
+               and l_shipmode in ('AIR', 'AIR REG')
+               and l_shipinstruct = 'DELIVER IN PERSON'
+       )
+       or
+       (
+               p_partkey = l_partkey
+               and p_brand = 'Brand#52'
+               and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
+               and l_quantity >= 30 and l_quantity <= 30 + 10
+               and p_size between 1 and 15
+               and l_shipmode in ('AIR', 'AIR REG')
+               and l_shipinstruct = 'DELIVER IN PERSON'
+       );
diff --git a/benchmark/tpch/queries/q2.sql b/benchmark/tpch/queries/q2.sql
new file mode 100644
index 0000000..2936532
--- /dev/null
+++ b/benchmark/tpch/queries/q2.sql
@@ -0,0 +1,45 @@
+-- SQLBench-H query 2 derived from TPC-H query 2 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       s_acctbal,
+       s_name,
+       n_name,
+       p_partkey,
+       p_mfgr,
+       s_address,
+       s_phone,
+       s_comment
+from
+       part,
+       supplier,
+       partsupp,
+       nation,
+       region
+where
+       p_partkey = ps_partkey
+       and s_suppkey = ps_suppkey
+       and p_size = 48
+       and p_type like '%TIN'
+       and s_nationkey = n_nationkey
+       and n_regionkey = r_regionkey
+       and r_name = 'ASIA'
+       and ps_supplycost = (
+               select
+                       min(ps_supplycost)
+               from
+                       partsupp,
+                       supplier,
+                       nation,
+                       region
+               where
+                       p_partkey = ps_partkey
+                       and s_suppkey = ps_suppkey
+                       and s_nationkey = n_nationkey
+                       and n_regionkey = r_regionkey
+                       and r_name = 'ASIA'
+       )
+order by
+       s_acctbal desc,
+       n_name,
+       s_name,
+       p_partkey limit 100;
diff --git a/benchmark/tpch/queries/q20.sql b/benchmark/tpch/queries/q20.sql
new file mode 100644
index 0000000..3136ca3
--- /dev/null
+++ b/benchmark/tpch/queries/q20.sql
@@ -0,0 +1,39 @@
+-- SQLBench-H query 20 derived from TPC-H query 20 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       s_name,
+       s_address
+from
+       supplier,
+       nation
+where
+       s_suppkey in (
+               select
+                       ps_suppkey
+               from
+                       partsupp
+               where
+                       ps_partkey in (
+                               select
+                                       p_partkey
+                               from
+                                       part
+                               where
+                                       p_name like 'blanched%'
+                       )
+                       and ps_availqty > (
+                               select
+                                       0.5 * sum(l_quantity)
+                               from
+                                       lineitem
+                               where
+                                       l_partkey = ps_partkey
+                                       and l_suppkey = ps_suppkey
+                                       and l_shipdate >= date '1993-01-01'
+                                       and l_shipdate < date '1993-01-01' + 
interval '1' year
+                       )
+       )
+       and s_nationkey = n_nationkey
+       and n_name = 'KENYA'
+order by
+       s_name;
diff --git a/benchmark/tpch/queries/q21.sql b/benchmark/tpch/queries/q21.sql
new file mode 100644
index 0000000..0170469
--- /dev/null
+++ b/benchmark/tpch/queries/q21.sql
@@ -0,0 +1,41 @@
+-- SQLBench-H query 21 derived from TPC-H query 21 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       s_name,
+       count(*) as numwait
+from
+       supplier,
+       lineitem l1,
+       orders,
+       nation
+where
+       s_suppkey = l1.l_suppkey
+       and o_orderkey = l1.l_orderkey
+       and o_orderstatus = 'F'
+       and l1.l_receiptdate > l1.l_commitdate
+       and exists (
+               select
+                       *
+               from
+                       lineitem l2
+               where
+                       l2.l_orderkey = l1.l_orderkey
+                       and l2.l_suppkey <> l1.l_suppkey
+       )
+       and not exists (
+               select
+                       *
+               from
+                       lineitem l3
+               where
+                       l3.l_orderkey = l1.l_orderkey
+                       and l3.l_suppkey <> l1.l_suppkey
+                       and l3.l_receiptdate > l3.l_commitdate
+       )
+       and s_nationkey = n_nationkey
+       and n_name = 'ARGENTINA'
+group by
+       s_name
+order by
+       numwait desc,
+       s_name limit 100;
diff --git a/benchmark/tpch/queries/q22.sql b/benchmark/tpch/queries/q22.sql
new file mode 100644
index 0000000..8d528ef
--- /dev/null
+++ b/benchmark/tpch/queries/q22.sql
@@ -0,0 +1,39 @@
+-- SQLBench-H query 22 derived from TPC-H query 22 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       cntrycode,
+       count(*) as numcust,
+       sum(c_acctbal) as totacctbal
+from
+       (
+               select
+                       substring(c_phone from 1 for 2) as cntrycode,
+                       c_acctbal
+               from
+                       customer
+               where
+                       substring(c_phone from 1 for 2) in
+                               ('24', '34', '16', '30', '33', '14', '13')
+                       and c_acctbal > (
+                               select
+                                       avg(c_acctbal)
+                               from
+                                       customer
+                               where
+                                       c_acctbal > 0.00
+                                       and substring(c_phone from 1 for 2) in
+                                               ('24', '34', '16', '30', '33', 
'14', '13')
+                       )
+                       and not exists (
+                               select
+                                       *
+                               from
+                                       orders
+                               where
+                                       o_custkey = c_custkey
+                       )
+       ) as custsale
+group by
+       cntrycode
+order by
+       cntrycode;
diff --git a/benchmark/tpch/queries/q3.sql b/benchmark/tpch/queries/q3.sql
new file mode 100644
index 0000000..b60be7f
--- /dev/null
+++ b/benchmark/tpch/queries/q3.sql
@@ -0,0 +1,24 @@
+-- SQLBench-H query 3 derived from TPC-H query 3 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       l_orderkey,
+       sum(l_extendedprice * (1 - l_discount)) as revenue,
+       o_orderdate,
+       o_shippriority
+from
+       customer,
+       orders,
+       lineitem
+where
+       c_mktsegment = 'BUILDING'
+       and c_custkey = o_custkey
+       and l_orderkey = o_orderkey
+       and o_orderdate < date '1995-03-15'
+       and l_shipdate > date '1995-03-15'
+group by
+       l_orderkey,
+       o_orderdate,
+       o_shippriority
+order by
+       revenue desc,
+       o_orderdate limit 10;
diff --git a/benchmark/tpch/queries/q4.sql b/benchmark/tpch/queries/q4.sql
new file mode 100644
index 0000000..05fae97
--- /dev/null
+++ b/benchmark/tpch/queries/q4.sql
@@ -0,0 +1,23 @@
+-- SQLBench-H query 4 derived from TPC-H query 4 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       o_orderpriority,
+       count(*) as order_count
+from
+       orders
+where
+       o_orderdate >= date '1995-04-01'
+       and o_orderdate < date '1995-04-01' + interval '3' month
+       and exists (
+               select
+                       *
+               from
+                       lineitem
+               where
+                       l_orderkey = o_orderkey
+                       and l_commitdate < l_receiptdate
+       )
+group by
+       o_orderpriority
+order by
+       o_orderpriority;
diff --git a/benchmark/tpch/queries/q5.sql b/benchmark/tpch/queries/q5.sql
new file mode 100644
index 0000000..4b97ef0
--- /dev/null
+++ b/benchmark/tpch/queries/q5.sql
@@ -0,0 +1,26 @@
+-- SQLBench-H query 5 derived from TPC-H query 5 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       n_name,
+       sum(l_extendedprice * (1 - l_discount)) as revenue
+from
+       customer,
+       orders,
+       lineitem,
+       supplier,
+       nation,
+       region
+where
+       c_custkey = o_custkey
+       and l_orderkey = o_orderkey
+       and l_suppkey = s_suppkey
+       and c_nationkey = s_nationkey
+       and s_nationkey = n_nationkey
+       and n_regionkey = r_regionkey
+       and r_name = 'AFRICA'
+       and o_orderdate >= date '1994-01-01'
+       and o_orderdate < date '1994-01-01' + interval '1' year
+group by
+       n_name
+order by
+       revenue desc;
diff --git a/benchmark/tpch/queries/q6.sql b/benchmark/tpch/queries/q6.sql
new file mode 100644
index 0000000..f5b4bae
--- /dev/null
+++ b/benchmark/tpch/queries/q6.sql
@@ -0,0 +1,11 @@
+-- SQLBench-H query 6 derived from TPC-H query 6 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       sum(l_extendedprice * l_discount) as revenue
+from
+       lineitem
+where
+       l_shipdate >= date '1994-01-01'
+       and l_shipdate < date '1994-01-01' + interval '1' year
+       and l_discount between 0.04 - 0.01 and 0.04 + 0.01
+       and l_quantity < 24;
diff --git a/benchmark/tpch/queries/q7.sql b/benchmark/tpch/queries/q7.sql
new file mode 100644
index 0000000..f3919be
--- /dev/null
+++ b/benchmark/tpch/queries/q7.sql
@@ -0,0 +1,41 @@
+-- SQLBench-H query 7 derived from TPC-H query 7 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       supp_nation,
+       cust_nation,
+       l_year,
+       sum(volume) as revenue
+from
+       (
+               select
+                       n1.n_name as supp_nation,
+                       n2.n_name as cust_nation,
+                       extract(year from l_shipdate) as l_year,
+                       l_extendedprice * (1 - l_discount) as volume
+               from
+                       supplier,
+                       lineitem,
+                       orders,
+                       customer,
+                       nation n1,
+                       nation n2
+               where
+                       s_suppkey = l_suppkey
+                       and o_orderkey = l_orderkey
+                       and c_custkey = o_custkey
+                       and s_nationkey = n1.n_nationkey
+                       and c_nationkey = n2.n_nationkey
+                       and (
+                               (n1.n_name = 'GERMANY' and n2.n_name = 'IRAQ')
+                               or (n1.n_name = 'IRAQ' and n2.n_name = 
'GERMANY')
+                       )
+                       and l_shipdate between date '1995-01-01' and date 
'1996-12-31'
+       ) as shipping
+group by
+       supp_nation,
+       cust_nation,
+       l_year
+order by
+       supp_nation,
+       cust_nation,
+       l_year;
diff --git a/benchmark/tpch/queries/q8.sql b/benchmark/tpch/queries/q8.sql
new file mode 100644
index 0000000..7c53e14
--- /dev/null
+++ b/benchmark/tpch/queries/q8.sql
@@ -0,0 +1,39 @@
+-- SQLBench-H query 8 derived from TPC-H query 8 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       o_year,
+       sum(case
+               when nation = 'IRAQ' then volume
+               else 0
+       end) / sum(volume) as mkt_share
+from
+       (
+               select
+                       extract(year from o_orderdate) as o_year,
+                       l_extendedprice * (1 - l_discount) as volume,
+                       n2.n_name as nation
+               from
+                       part,
+                       supplier,
+                       lineitem,
+                       orders,
+                       customer,
+                       nation n1,
+                       nation n2,
+                       region
+               where
+                       p_partkey = l_partkey
+                       and s_suppkey = l_suppkey
+                       and l_orderkey = o_orderkey
+                       and o_custkey = c_custkey
+                       and c_nationkey = n1.n_nationkey
+                       and n1.n_regionkey = r_regionkey
+                       and r_name = 'MIDDLE EAST'
+                       and s_nationkey = n2.n_nationkey
+                       and o_orderdate between date '1995-01-01' and date 
'1996-12-31'
+                       and p_type = 'LARGE PLATED STEEL'
+       ) as all_nations
+group by
+       o_year
+order by
+       o_year;
diff --git a/benchmark/tpch/queries/q9.sql b/benchmark/tpch/queries/q9.sql
new file mode 100644
index 0000000..2455695
--- /dev/null
+++ b/benchmark/tpch/queries/q9.sql
@@ -0,0 +1,34 @@
+-- SQLBench-H query 9 derived from TPC-H query 9 under the terms of the TPC 
Fair Use Policy.
+-- TPC-H queries are Copyright 1993-2022 Transaction Processing Performance 
Council.
+select
+       nation,
+       o_year,
+       sum(amount) as sum_profit
+from
+       (
+               select
+                       n_name as nation,
+                       extract(year from o_orderdate) as o_year,
+                       l_extendedprice * (1 - l_discount) - ps_supplycost * 
l_quantity as amount
+               from
+                       part,
+                       supplier,
+                       lineitem,
+                       partsupp,
+                       orders,
+                       nation
+               where
+                       s_suppkey = l_suppkey
+                       and ps_suppkey = l_suppkey
+                       and ps_partkey = l_partkey
+                       and p_partkey = l_partkey
+                       and o_orderkey = l_orderkey
+                       and s_nationkey = n_nationkey
+                       and p_name like '%moccasin%'
+       ) as profit
+group by
+       nation,
+       o_year
+order by
+       nation,
+       o_year desc;
diff --git a/benchmark/tpch/run.sh b/benchmark/tpch/run.sh
new file mode 100755
index 0000000..714000c
--- /dev/null
+++ b/benchmark/tpch/run.sh
@@ -0,0 +1,458 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
+
+DEFAULT_SCALE_FACTOR=1
+DOCKER_IMAGE="tpch-bench"
+TPCH_BIN="$REPO_ROOT/target/release/tpch"
+MODE="${MODE:-native}"
+
+build_tpch() {
+  echo "Building TPC-H tool..."
+  cargo build -p tpch --release --manifest-path "$REPO_ROOT/Cargo.toml"
+}
+
+build_docker() {
+  echo "Building TPC-H Docker image..."
+  docker build -t "$DOCKER_IMAGE" -f "$SCRIPT_DIR/infra/Dockerfile" 
"$REPO_ROOT"
+}
+
+# Read spark-submit args from tpch binary (one token per line) into SPARK_ARGS 
array.
+read_spark_args() {
+  SPARK_ARGS=()
+  while IFS= read -r line; do
+    SPARK_ARGS+=("$line")
+  done < <("$TPCH_BIN" spark-args "$@")
+}
+
+# Setup Spark config files for native mode.
+setup_spark_native() {
+  if [ -z "${SPARK_HOME:-}" ]; then
+    echo "Error: SPARK_HOME is not set. Set it to your Spark installation 
directory." >&2
+    exit 1
+  fi
+  if [ ! -x "$SPARK_HOME/bin/spark-submit" ]; then
+    echo "Error: $SPARK_HOME/bin/spark-submit not found or not executable." >&2
+    exit 1
+  fi
+
+  echo "Configuring Spark at $SPARK_HOME..."
+  cp "$SCRIPT_DIR/infra/spark/spark-defaults.conf" 
"$SPARK_HOME/conf/spark-defaults.conf"
+  cp "$SCRIPT_DIR/infra/spark/log4j2.properties" 
"$SPARK_HOME/conf/log4j2.properties"
+}
+
+usage() {
+  cat <<EOF
+Usage: $0 <command> [options]
+
+Commands:
+  generate          Generate TPC-H parquet data
+  create-tables     Create Hudi COW tables from parquet via Spark (Docker)
+  bench-spark       Run TPC-H queries against Hudi tables via Spark SQL
+  bench-datafusion  Run TPC-H queries against Hudi tables via DataFusion
+  compare           Compare persisted benchmark results with bar charts
+
+Environment:
+  MODE              Execution mode: docker (default) or native
+
+Options:
+  --scale-factor N  TPC-H scale factor (default: $DEFAULT_SCALE_FACTOR)
+  --format F        Table format: hudi or parquet (default: auto)
+  --queries Q       Comma-separated query numbers (default: all 22)
+  --iterations N    Number of measured iterations per query (from config)
+  --warmup N        Number of unmeasured warmup iterations per query (from 
config)
+  --output-dir D    Directory to persist results as JSON (bench commands only)
+  --engines E       Comma-separated engine names to compare (compare command 
only)
+
+Examples:
+  $0 generate --scale-factor 1
+  $0 create-tables --scale-factor 1
+  MODE=native $0 bench-spark --scale-factor 1 --queries 1,3,6
+  MODE=native $0 bench-datafusion --scale-factor 1 --queries 1,3,6
+  $0 bench-datafusion --scale-factor 1 --output-dir results
+  $0 compare --scale-factor 1 --engines datafusion,spark --format hudi
+EOF
+}
+
+# --- Commands ---
+
+cmd_generate() {
+  local sf="$DEFAULT_SCALE_FACTOR"
+  while [[ $# -gt 0 ]]; do
+    case "$1" in
+      --scale-factor) sf="$2"; shift 2 ;;
+      *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+    esac
+  done
+
+  local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+  if [ -d "$parquet_dir" ]; then
+    echo "Removing existing parquet data at $parquet_dir..."
+    rm -rf "$parquet_dir"
+  fi
+
+  build_tpch
+  "$TPCH_BIN" generate --scale-factor "$sf"
+}
+
+cmd_create_tables() {
+  local sf="$DEFAULT_SCALE_FACTOR"
+  while [[ $# -gt 0 ]]; do
+    case "$1" in
+      --scale-factor) sf="$2"; shift 2 ;;
+      *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+    esac
+  done
+
+  local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+  if [ ! -d "$parquet_dir" ]; then
+    echo "Error: parquet data not found at $parquet_dir. Run 'generate' 
first." >&2
+    exit 1
+  fi
+
+  local hudi_dir="$SCRIPT_DIR/data/sf$sf-hudi"
+  if [ -d "$hudi_dir" ]; then
+    echo "Removing existing Hudi data at $hudi_dir..."
+    rm -rf "$hudi_dir"
+  fi
+
+  build_tpch
+  build_docker
+  mkdir -p "$hudi_dir"
+
+  local sql_file
+  sql_file="$(mktemp)"
+  "$TPCH_BIN" render-ctas --scale-factor "$sf" \
+    --parquet-base /opt/parquet --hudi-base /opt/hudi > "$sql_file"
+
+  read_spark_args --scale-factor "$sf" --command create-tables
+
+  echo "Creating Hudi COW tables from parquet (sf$sf)..."
+  local docker_exit=0
+  docker run --rm \
+    -v "$parquet_dir:/opt/parquet:ro" \
+    -v "$hudi_dir:/opt/hudi" \
+    -v "$sql_file:/opt/spark/work-dir/create_hudi_tables.sql:ro" \
+    "$DOCKER_IMAGE" \
+    /opt/spark/bin/spark-sql "${SPARK_ARGS[@]}" \
+    -f /opt/spark/work-dir/create_hudi_tables.sql \
+    || docker_exit=$?
+
+  rm -f "$sql_file"
+  if [ $docker_exit -ne 0 ]; then
+    echo "Error: Spark SQL failed with exit code $docker_exit" >&2
+    return $docker_exit
+  fi
+  echo "Hudi COW tables created at: $hudi_dir"
+}
+
+cmd_bench_spark() {
+  local sf="$DEFAULT_SCALE_FACTOR"
+  local queries=""
+  local iterations=""
+  local warmup=""
+  local output_dir=""
+  local format="hudi"
+  while [[ $# -gt 0 ]]; do
+    case "$1" in
+      --scale-factor) sf="$2"; shift 2 ;;
+      --queries) queries="$2"; shift 2 ;;
+      --iterations) iterations="$2"; shift 2 ;;
+      --warmup) warmup="$2"; shift 2 ;;
+      --output-dir) output_dir="$2"; shift 2 ;;
+      --format) format="$2"; shift 2 ;;
+      *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+    esac
+  done
+
+  build_tpch
+
+  # Read defaults from config via the tpch binary
+  if [ -z "$warmup" ] || [ -z "$iterations" ]; then
+    local defaults
+    defaults=$("$TPCH_BIN" bench-defaults --scale-factor "$sf")
+    local cfg_warmup cfg_iterations
+    cfg_warmup=$(echo "$defaults" | awk '{print $1}')
+    cfg_iterations=$(echo "$defaults" | awk '{print $2}')
+    warmup="${warmup:-$cfg_warmup}"
+    iterations="${iterations:-$cfg_iterations}"
+  fi
+
+  local hudi_dir="$SCRIPT_DIR/data/sf$sf-hudi"
+  local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+
+  local data_dir=""
+  local bench_data_arg=""
+  case "$format" in
+    hudi)
+      data_dir="$hudi_dir"
+      bench_data_arg="--hudi-base"
+      ;;
+    parquet)
+      data_dir="$parquet_dir"
+      bench_data_arg="--parquet-base"
+      ;;
+    *) echo "Error: unknown format '$format'. Use 'hudi' or 'parquet'." >&2; 
exit 1 ;;
+  esac
+
+  if [ ! -d "$data_dir" ]; then
+    echo "Error: $format data not found at $data_dir." >&2
+    exit 1
+  fi
+
+  read_spark_args --scale-factor "$sf" --command bench
+
+  local tmp_dir
+  tmp_dir="$(mktemp -d)"
+  local output_file="$tmp_dir/results.jsonl"
+
+  if [ "$MODE" = "native" ]; then
+    setup_spark_native
+
+    local bench_args=(
+      $bench_data_arg "$data_dir"
+      --query-dir "$SCRIPT_DIR/queries"
+      --scale-factor "$sf"
+      --warmup "$warmup"
+      --iterations "$iterations"
+      --output "$output_file"
+    )
+    if [ -n "$queries" ]; then
+      bench_args+=(--queries "$queries")
+    fi
+
+    echo "Running Spark SQL benchmark ($format, native)..."
+    "$SPARK_HOME/bin/spark-submit" \
+      --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:1.1.1 \
+      "${SPARK_ARGS[@]}" \
+      "$SCRIPT_DIR/infra/spark/bench.py" \
+      "${bench_args[@]}"
+  else
+    build_docker
+
+    local bench_args=(
+      $bench_data_arg /opt/data
+      --query-dir /opt/queries
+      --scale-factor "$sf"
+      --warmup "$warmup"
+      --iterations "$iterations"
+      --output /opt/output/results.jsonl
+    )
+    if [ -n "$queries" ]; then
+      bench_args+=(--queries "$queries")
+    fi
+
+    echo "Running Spark SQL benchmark ($format, docker)..."
+    local docker_exit=0
+    docker run --rm \
+      -e PYTHONUNBUFFERED=1 \
+      -v "$data_dir:/opt/data:ro" \
+      -v "$SCRIPT_DIR/queries:/opt/queries:ro" \
+      -v "$SCRIPT_DIR/infra/spark/bench.py:/opt/spark/work-dir/bench.py:ro" \
+      -v "$tmp_dir:/opt/output" \
+      "$DOCKER_IMAGE" \
+      /opt/spark/bin/spark-submit "${SPARK_ARGS[@]}" \
+      /opt/spark/work-dir/bench.py \
+      "${bench_args[@]}" \
+      || docker_exit=$?
+
+    if [ $docker_exit -ne 0 ]; then
+      echo "Error: Spark SQL benchmark failed with exit code $docker_exit" >&2
+      rm -rf "$tmp_dir"
+      return $docker_exit
+    fi
+  fi
+
+  echo ""
+  local parse_args=(parse-spark-output --input "$output_file")
+  if [ -n "$output_dir" ]; then
+    mkdir -p "$output_dir"
+    parse_args+=(--output-dir "$output_dir" --engine-label spark 
--format-label "$format" --display-name "spark+hudi" --scale-factor "$sf")
+  fi
+  "$TPCH_BIN" "${parse_args[@]}"
+  rm -rf "$tmp_dir"
+}
+
+cmd_bench_datafusion() {
+  local sf="$DEFAULT_SCALE_FACTOR"
+  local format=""
+  local queries=""
+  local iterations=""
+  local warmup=""
+  local output_dir=""
+  while [[ $# -gt 0 ]]; do
+    case "$1" in
+      --scale-factor) sf="$2"; shift 2 ;;
+      --format) format="$2"; shift 2 ;;
+      --queries) queries="$2"; shift 2 ;;
+      --iterations) iterations="$2"; shift 2 ;;
+      --warmup) warmup="$2"; shift 2 ;;
+      --output-dir) output_dir="$2"; shift 2 ;;
+      *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+    esac
+  done
+
+  local hudi_dir="$SCRIPT_DIR/data/sf$sf-hudi"
+  local parquet_dir="$SCRIPT_DIR/data/sf$sf-parquet"
+
+  # Determine which formats to bench
+  local use_hudi=false
+  local use_parquet=false
+  case "$format" in
+    hudi)    use_hudi=true ;;
+    parquet) use_parquet=true ;;
+    "")
+      # Default: use whichever data dirs exist
+      [ -d "$hudi_dir" ] && use_hudi=true
+      [ -d "$parquet_dir" ] && use_parquet=true
+      ;;
+    *) echo "Error: unknown format '$format'. Use 'hudi' or 'parquet'." >&2; 
exit 1 ;;
+  esac
+
+  if [ "$use_hudi" = false ] && [ "$use_parquet" = false ]; then
+    echo "Error: no data found for sf$sf. Run 'generate' and/or 
'create-tables' first." >&2
+    exit 1
+  fi
+  if [ "$use_hudi" = true ] && [ ! -d "$hudi_dir" ]; then
+    echo "Error: Hudi data not found at $hudi_dir. Run 'create-tables' first." 
>&2
+    exit 1
+  fi
+  if [ "$use_parquet" = true ] && [ ! -d "$parquet_dir" ]; then
+    echo "Error: Parquet data not found at $parquet_dir. Run 'generate' 
first." >&2
+    exit 1
+  fi
+
+  if [ "$MODE" = "native" ]; then
+    build_tpch
+
+    local bench_args=(bench --scale-factor "$sf")
+    [ "$use_hudi" = true ] && bench_args+=(--hudi-dir "$hudi_dir")
+    [ "$use_parquet" = true ] && bench_args+=(--parquet-dir "$parquet_dir")
+    [ -n "$queries" ] && bench_args+=(--queries "$queries")
+    [ -n "$iterations" ] && bench_args+=(--iterations "$iterations")
+    [ -n "$warmup" ] && bench_args+=(--warmup "$warmup")
+
+    if [ -n "$output_dir" ]; then
+      mkdir -p "$output_dir"
+      output_dir="$(cd "$output_dir" && pwd)"
+      bench_args+=(--output-dir "$output_dir" --engine-label datafusion 
--format-label "${format:-hudi}" --display-name "datafusion+hudi-rs")
+    fi
+
+    echo "Running DataFusion benchmark (native)..."
+    TPCH_CONFIG_DIR="$SCRIPT_DIR/config" \
+    TPCH_QUERY_DIR="$SCRIPT_DIR/queries" \
+    RUST_LOG="${RUST_LOG:-warn}" \
+    "$TPCH_BIN" "${bench_args[@]}"
+  else
+    build_docker
+
+    # Resolve output_dir to absolute path (Docker requires it)
+    if [ -n "$output_dir" ]; then
+      mkdir -p "$output_dir"
+      output_dir="$(cd "$output_dir" && pwd)"
+    fi
+
+    local bench_args=(bench --scale-factor "$sf")
+    [ "$use_hudi" = true ] && bench_args+=(--hudi-dir /opt/hudi)
+    [ "$use_parquet" = true ] && bench_args+=(--parquet-dir /opt/parquet)
+    [ -n "$queries" ] && bench_args+=(--queries "$queries")
+    [ -n "$iterations" ] && bench_args+=(--iterations "$iterations")
+    [ -n "$warmup" ] && bench_args+=(--warmup "$warmup")
+
+    if [ -n "$output_dir" ]; then
+      bench_args+=(--output-dir /opt/results --engine-label datafusion 
--format-label "${format:-hudi}" --display-name "datafusion+hudi-rs")
+    fi
+
+    echo "Running DataFusion benchmark (docker)..."
+    local volumes=()
+    [ "$use_hudi" = true ] && volumes+=(-v "$hudi_dir:/opt/hudi:ro")
+    [ "$use_parquet" = true ] && volumes+=(-v "$parquet_dir:/opt/parquet:ro")
+    volumes+=(-v "$SCRIPT_DIR/queries:/opt/queries:ro")
+    volumes+=(-v "$SCRIPT_DIR/config:/opt/config:ro")
+    [ -n "$output_dir" ] && volumes+=(-v "$output_dir:/opt/results")
+
+    docker run --rm \
+      "${volumes[@]}" \
+      -e TPCH_CONFIG_DIR=/opt/config \
+      -e TPCH_QUERY_DIR=/opt/queries \
+      -e RUST_LOG="${RUST_LOG:-warn}" \
+      "$DOCKER_IMAGE" \
+      tpch "${bench_args[@]}"
+  fi
+}
+
+cmd_compare() {
+  local sf="$DEFAULT_SCALE_FACTOR"
+  local engines=""
+  local format="hudi"
+  while [[ $# -gt 0 ]]; do
+    case "$1" in
+      --scale-factor) sf="$2"; shift 2 ;;
+      --engines) engines="$2"; shift 2 ;;
+      --format) format="$2"; shift 2 ;;
+      *) echo "Unknown option: $1" >&2; usage; exit 1 ;;
+    esac
+  done
+
+  if [ -z "$engines" ]; then
+    echo "Error: --engines is required (e.g., --engines datafusion,spark)" >&2
+    exit 1
+  fi
+
+  # Convert "datafusion,spark" → "datafusion_hudi_sf1,spark_hudi_sf1"
+  local runs=""
+  IFS=',' read -ra engine_arr <<< "$engines"
+  for e in "${engine_arr[@]}"; do
+    [ -n "$runs" ] && runs+=","
+    runs+="${e}_${format}_sf${sf}"
+  done
+
+  build_tpch
+  "$TPCH_BIN" compare \
+    --results-dir "$SCRIPT_DIR/results" \
+    --runs "$runs"
+}
+
+# --- Main ---
+
+if [[ $# -lt 1 ]]; then
+  usage
+  exit 1
+fi
+
+COMMAND="$1"
+shift
+
+case "$COMMAND" in
+  generate)         cmd_generate "$@" ;;
+  create-tables)    cmd_create_tables "$@" ;;
+  bench-spark)      cmd_bench_spark "$@" ;;
+  bench-datafusion) cmd_bench_datafusion "$@" ;;
+  compare)          cmd_compare "$@" ;;
+  *)
+    echo "Unknown command: $COMMAND" >&2
+    usage
+    exit 1
+    ;;
+esac
diff --git a/benchmark/tpch/src/config.rs b/benchmark/tpch/src/config.rs
new file mode 100644
index 0000000..c375e7c
--- /dev/null
+++ b/benchmark/tpch/src/config.rs
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::BTreeMap;
+use std::fmt::Write;
+use std::path::{Path, PathBuf};
+
+use serde::Deserialize;
+
+/// Canonical table ordering for SQL output (matches TPC-H dependency order).
+const TABLE_ORDER: &[&str] = &[
+    "nation", "region", "part", "supplier", "partsupp", "customer", "orders", 
"lineitem",
+];
+
+/// Common table definition shared across all scale factors (from tables.yaml).
+#[derive(Deserialize)]
+struct CommonTableConfig {
+    primary_key: String,
+    pre_combine_field: String,
+    record_size_estimate: u32,
+}
+
+/// Common tables file (tables.yaml).
+#[derive(Deserialize)]
+struct CommonConfig {
+    tables: BTreeMap<String, CommonTableConfig>,
+}
+
+/// Per-scale-factor overrides (sf*.yaml).
+#[derive(Deserialize)]
+struct ScaleFactorOverrides {
+    shuffle_parallelism: BTreeMap<String, u32>,
+    create_tables: SparkCommandConfig,
+    bench: BenchConfig,
+}
+
+/// Merged table config used at runtime.
+pub struct TableConfig {
+    pub primary_key: String,
+    pub pre_combine_field: String,
+    pub record_size_estimate: u32,
+    pub shuffle_parallelism: u32,
+}
+
+pub struct ScaleFactorConfig {
+    pub tables: BTreeMap<String, TableConfig>,
+    pub create_tables: SparkCommandConfig,
+    pub bench: BenchConfig,
+}
+
+#[derive(Deserialize)]
+pub struct SparkCommandConfig {
+    #[serde(default)]
+    pub spark_conf: BTreeMap<String, String>,
+}
+
+#[derive(Deserialize)]
+pub struct BenchConfig {
+    #[serde(default)]
+    pub warmup: usize,
+    #[serde(default = "default_iterations")]
+    pub iterations: usize,
+    #[serde(default)]
+    pub memory_limit: Option<String>,
+    #[serde(default)]
+    pub spark_conf: BTreeMap<String, String>,
+}
+
+fn default_iterations() -> usize {
+    1
+}
+
+impl ScaleFactorConfig {
+    /// Supported scale factors that have config files.
+    const SUPPORTED: &[u64] = &[1, 10, 100, 1000];
+
+    /// Load common table definitions and per-SF overrides, then merge them.
+    pub fn load(scale_factor: f64) -> Result<Self, Box<dyn std::error::Error>> 
{
+        let effective_sf = if scale_factor < 1.0 {
+            1u64
+        } else {
+            let sf = scale_factor as u64;
+            if !Self::SUPPORTED.contains(&sf) {
+                return Err(format!(
+                    "Unsupported scale factor {scale_factor}. Supported: {:?}",
+                    Self::SUPPORTED
+                )
+                .into());
+            }
+            sf
+        };
+
+        let config_dir = std::env::var("TPCH_CONFIG_DIR")
+            .map(PathBuf::from)
+            .unwrap_or_else(|_| 
Path::new(env!("CARGO_MANIFEST_DIR")).join("config"));
+
+        // Load common table definitions
+        let common_path = config_dir.join("tables.yaml");
+        let common_content = std::fs::read_to_string(&common_path)
+            .map_err(|e| format!("Failed to read {}: {e}", 
common_path.display()))?;
+        let common: CommonConfig = serde_yaml::from_str(&common_content)
+            .map_err(|e| format!("Failed to parse tables.yaml: {e}"))?;
+
+        // Load per-SF overrides
+        let sf_filename = format!("sf{effective_sf}.yaml");
+        let sf_path = config_dir.join(&sf_filename);
+        let sf_content = std::fs::read_to_string(&sf_path)
+            .map_err(|e| format!("Failed to read config {}: {e}", 
sf_path.display()))?;
+        let overrides: ScaleFactorOverrides = serde_yaml::from_str(&sf_content)
+            .map_err(|e| format!("Failed to parse config {sf_filename}: 
{e}"))?;
+
+        // Merge: common tables + per-SF shuffle_parallelism
+        let mut tables = BTreeMap::new();
+        for (name, common_table) in common.tables {
+            let shuffle_parallelism = overrides
+                .shuffle_parallelism
+                .get(&name)
+                .copied()
+                .unwrap_or(1);
+            tables.insert(
+                name,
+                TableConfig {
+                    primary_key: common_table.primary_key,
+                    pre_combine_field: common_table.pre_combine_field,
+                    record_size_estimate: common_table.record_size_estimate,
+                    shuffle_parallelism,
+                },
+            );
+        }
+
+        Ok(Self {
+            tables,
+            create_tables: overrides.create_tables,
+            bench: overrides.bench,
+        })
+    }
+
+    /// Generate CTAS SQL for creating Hudi tables from parquet sources.
+    pub fn render_ctas_sql(&self, parquet_base: &str, hudi_base: &str) -> 
String {
+        let mut sql = String::new();
+        for &name in TABLE_ORDER {
+            let Some(table) = self.tables.get(name) else {
+                continue;
+            };
+            writeln!(sql, "CREATE TABLE {name} USING hudi").unwrap();
+            writeln!(sql, "LOCATION '{hudi_base}/{name}'").unwrap();
+            writeln!(sql, "TBLPROPERTIES (").unwrap();
+            writeln!(sql, "  type = 'cow',").unwrap();
+            writeln!(sql, "  primaryKey = '{}',", table.primary_key).unwrap();
+            writeln!(sql, "  preCombineField = '{}',", 
table.pre_combine_field).unwrap();
+            writeln!(sql, "  'hoodie.table.name' = '{name}',").unwrap();
+            writeln!(
+                sql,
+                "  'hoodie.bulkinsert.shuffle.parallelism' = '{}',",
+                table.shuffle_parallelism
+            )
+            .unwrap();
+            writeln!(
+                sql,
+                "  'hoodie.copyonwrite.record.size.estimate' = '{}'",
+                table.record_size_estimate
+            )
+            .unwrap();
+            writeln!(sql, ") AS SELECT * FROM 
parquet.`{parquet_base}/{name}/`;").unwrap();
+            writeln!(sql).unwrap();
+        }
+        sql
+    }
+
+    /// Generate benchmark SQL: table registrations followed by query 
iterations.
+    pub fn render_bench_sql(
+        &self,
+        hudi_base: &str,
+        query_nums: &[usize],
+        iterations: usize,
+        scale_factor: f64,
+    ) -> Result<String, Box<dyn std::error::Error>> {
+        let mut sql = String::new();
+
+        // Register Hudi tables
+        for &name in TABLE_ORDER {
+            if self.tables.contains_key(name) {
+                writeln!(
+                    sql,
+                    "CREATE TABLE {name} USING hudi LOCATION 
'{hudi_base}/{name}';"
+                )
+                .unwrap();
+            }
+        }
+        writeln!(sql).unwrap();
+
+        // Per-SF substitution values (TPC-H spec Section 2.4.11.3: FRACTION = 
0.0001 / SF)
+        let q11_fraction = format!("{:.10}", 0.0001 / scale_factor);
+
+        // Add queries with bench markers
+        let queries_dir = 
Path::new(env!("CARGO_MANIFEST_DIR")).join("queries");
+        for &qn in query_nums {
+            let qfile = queries_dir.join(format!("q{qn}.sql"));
+            let query_sql = std::fs::read_to_string(&qfile)
+                .map_err(|e| format!("Failed to read q{qn}.sql: {e}"))?;
+            let query_sql = query_sql.replace("${Q11_FRACTION}", 
&q11_fraction);
+            for i in 1..=iterations {
+                writeln!(sql).unwrap();
+                writeln!(sql, "SELECT 'BENCH_MARKER q{qn} iter{i}' as 
marker;").unwrap();
+                write!(sql, "{query_sql}").unwrap();
+                if !query_sql.ends_with('\n') {
+                    writeln!(sql).unwrap();
+                }
+            }
+        }
+
+        Ok(sql)
+    }
+
+    /// Generate spark-submit arguments for a given command, one per line.
+    pub fn render_spark_args(&self, command: &str) -> Result<Vec<String>, 
String> {
+        let spark_conf = match command {
+            "create-tables" => &self.create_tables.spark_conf,
+            "bench" => &self.bench.spark_conf,
+            _ => return Err(format!("Unknown command: {command}")),
+        };
+
+        let mut args = vec!["--master".to_string(), "local[*]".to_string()];
+
+        for (key, value) in spark_conf {
+            args.push("--conf".to_string());
+            args.push(format!("{key}={value}"));
+        }
+
+        Ok(args)
+    }
+}
diff --git a/benchmark/tpch/src/datagen.rs b/benchmark/tpch/src/datagen.rs
new file mode 100644
index 0000000..fb4f675
--- /dev/null
+++ b/benchmark/tpch/src/datagen.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use object_store::buffered::BufWriter;
+use object_store::path::Path as ObjectPath;
+use object_store::{ObjectStore, parse_url_opts};
+use parquet::arrow::ArrowWriter;
+use parquet::arrow::async_writer::AsyncArrowWriter;
+use parquet::basic::Compression;
+use parquet::file::properties::WriterProperties;
+use tpchgen::generators::{
+    CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, 
PartGenerator,
+    PartSuppGenerator, RegionGenerator, SupplierGenerator,
+};
+use tpchgen_arrow::{
+    CustomerArrow, LineItemArrow, NationArrow, OrderArrow, PartArrow, 
PartSuppArrow,
+    RecordBatchIterator, RegionArrow, SupplierArrow,
+};
+use url::Url;
+
+use crate::{collect_cloud_env_vars, is_cloud_url};
+
+fn writer_props() -> WriterProperties {
+    WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        .build()
+}
+
+fn write_parquet_local(
+    batches: &mut dyn Iterator<Item = RecordBatch>,
+    schema: &SchemaRef,
+    path: &std::path::Path,
+) -> Result<(), Box<dyn std::error::Error>> {
+    let file = std::fs::File::create(path)?;
+    let mut writer = ArrowWriter::try_new(file, schema.clone(), 
Some(writer_props()))?;
+    for batch in batches {
+        writer.write(&batch)?;
+    }
+    writer.close()?;
+    Ok(())
+}
+
+async fn write_parquet_cloud(
+    batches: &mut dyn Iterator<Item = RecordBatch>,
+    schema: &SchemaRef,
+    store: &Arc<dyn ObjectStore>,
+    path: &ObjectPath,
+) -> Result<(), Box<dyn std::error::Error>> {
+    let buf_writer = BufWriter::new(Arc::clone(store), path.clone());
+    let mut writer = AsyncArrowWriter::try_new(buf_writer, schema.clone(), 
Some(writer_props()))?;
+    for batch in batches {
+        writer.write(&batch).await?;
+    }
+    writer.close().await?;
+    Ok(())
+}
+
+fn generate_table_local(
+    name: &str,
+    output_dir: &std::path::Path,
+    batches: &mut dyn Iterator<Item = RecordBatch>,
+    schema: &SchemaRef,
+) -> Result<(), Box<dyn std::error::Error>> {
+    let table_dir = output_dir.join(name);
+    std::fs::create_dir_all(&table_dir)?;
+    println!("  Generating table: {name} ...");
+    write_parquet_local(batches, schema, &table_dir.join("data.parquet"))?;
+    println!("  Done: {name}");
+    Ok(())
+}
+
+async fn generate_table_cloud(
+    name: &str,
+    store: &Arc<dyn ObjectStore>,
+    base_path: &ObjectPath,
+    batches: &mut dyn Iterator<Item = RecordBatch>,
+    schema: &SchemaRef,
+) -> Result<(), Box<dyn std::error::Error>> {
+    let file_path = 
ObjectPath::from(format!("{base_path}/{name}/data.parquet"));
+    println!("  Generating table: {name} ...");
+    write_parquet_cloud(batches, schema, store, &file_path).await?;
+    println!("  Done: {name}");
+    Ok(())
+}
+
+macro_rules! gen_all_tables {
+    ($sf:expr, $gen_fn:ident $(, $ctx:expr)*) => {{
+        let tables: Vec<(&str, Box<dyn FnOnce(f64) -> (Box<dyn Iterator<Item = 
RecordBatch>>, SchemaRef)>)> = vec![
+            ("nation", Box::new(|sf| {
+                let it = NationArrow::new(NationGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as Box<dyn 
Iterator<Item = RecordBatch>>, s)
+            })),
+            ("region", Box::new(|sf| {
+                let it = RegionArrow::new(RegionGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as _, s)
+            })),
+            ("part", Box::new(|sf| {
+                let it = PartArrow::new(PartGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as _, s)
+            })),
+            ("supplier", Box::new(|sf| {
+                let it = SupplierArrow::new(SupplierGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as _, s)
+            })),
+            ("partsupp", Box::new(|sf| {
+                let it = PartSuppArrow::new(PartSuppGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as _, s)
+            })),
+            ("customer", Box::new(|sf| {
+                let it = CustomerArrow::new(CustomerGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as _, s)
+            })),
+            ("orders", Box::new(|sf| {
+                let it = OrderArrow::new(OrderGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as _, s)
+            })),
+            ("lineitem", Box::new(|sf| {
+                let it = LineItemArrow::new(LineItemGenerator::new(sf, 1, 1));
+                let s = it.schema().clone(); (Box::new(it) as _, s)
+            })),
+        ];
+        for (name, factory) in tables {
+            let (mut batches, schema) = factory($sf);
+            $gen_fn(name, &mut *batches, &schema $(, $ctx)*).await?;
+        }
+    }};
+}
+
+async fn do_generate_local(
+    name: &str,
+    batches: &mut dyn Iterator<Item = RecordBatch>,
+    schema: &SchemaRef,
+    output_dir: &std::path::Path,
+) -> Result<(), Box<dyn std::error::Error>> {
+    generate_table_local(name, output_dir, batches, schema)
+}
+
+async fn do_generate_cloud(
+    name: &str,
+    batches: &mut dyn Iterator<Item = RecordBatch>,
+    schema: &SchemaRef,
+    ctx: &(Arc<dyn ObjectStore>, ObjectPath),
+) -> Result<(), Box<dyn std::error::Error>> {
+    generate_table_cloud(name, &ctx.0, &ctx.1, batches, schema).await
+}
+
+pub async fn run_generate(
+    scale_factor: f64,
+    output_dir: &str,
+) -> Result<(), Box<dyn std::error::Error>> {
+    let sf = scale_factor;
+
+    if is_cloud_url(output_dir) {
+        println!("Generating TPC-H data (scale factor {sf}) to {output_dir}");
+
+        let url = Url::parse(output_dir)?;
+        let env_vars = collect_cloud_env_vars();
+        let (store, object_path) = parse_url_opts(&url, env_vars)?;
+        let ctx = (Arc::from(store) as Arc<dyn ObjectStore>, object_path);
+
+        gen_all_tables!(sf, do_generate_cloud, &ctx);
+    } else {
+        let out_path = std::path::Path::new(output_dir);
+        println!(
+            "Generating TPC-H data (scale factor {sf}) into {}",
+            out_path.display()
+        );
+
+        gen_all_tables!(sf, do_generate_local, out_path);
+    }
+
+    println!("All TPC-H tables generated successfully.");
+    Ok(())
+}
diff --git a/benchmark/tpch/src/main.rs b/benchmark/tpch/src/main.rs
new file mode 100644
index 0000000..9c38663
--- /dev/null
+++ b/benchmark/tpch/src/main.rs
@@ -0,0 +1,1268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod config;
+mod datagen;
+
+use std::collections::{BTreeMap, BTreeSet, HashMap};
+use std::fs;
+use std::io::BufRead;
+use std::path::Path;
+use std::sync::Arc;
+use std::time::{Instant, SystemTime, UNIX_EPOCH};
+
+use arrow::datatypes::DataType;
+use arrow_array::RecordBatch;
+use arrow_cast::display::{ArrayFormatter, FormatOptions};
+use clap::{Parser, Subcommand};
+use comfy_table::{Cell, Table};
+use datafusion::dataframe::DataFrame;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::memory_pool::FairSpillPool;
+use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::prelude::SessionConfig;
+use hudi::HudiDataSource;
+use serde::{Deserialize, Serialize};
+
+/// The 8 TPC-H tables.
+const TPCH_TABLES: &[&str] = &[
+    "customer", "lineitem", "nation", "orders", "part", "partsupp", "region", 
"supplier",
+];
+
+/// Total number of TPC-H queries.
+const NUM_QUERIES: usize = 22;
+
+/// Cloud URL scheme prefixes.
+const CLOUD_SCHEMES: &[&str] = &["s3://", "s3a://", "gs://", "wasb://", 
"wasbs://", "az://"];
+
+#[derive(Parser)]
+#[command(name = "tpch", about = "TPC-H benchmark tool for Apache Hudi")]
+struct Cli {
+    #[command(subcommand)]
+    command: Commands,
+}
+
+#[derive(Subcommand)]
+enum Commands {
+    /// Generate TPC-H parquet data
+    Generate {
+        /// TPC-H scale factor
+        #[arg(long, default_value_t = 1.0)]
+        scale_factor: f64,
+
+        /// Output directory (local path or cloud URL); defaults to 
data/sf{N}-parquet
+        #[arg(long)]
+        output_dir: Option<String>,
+    },
+    /// Render CTAS SQL from scale-factor config
+    RenderCtas {
+        /// TPC-H scale factor (loads config/sf{N}.yaml)
+        #[arg(long)]
+        scale_factor: f64,
+
+        /// Parquet source base path (e.g., /opt/parquet or gs://bucket/path)
+        #[arg(long)]
+        parquet_base: String,
+
+        /// Hudi output base path (e.g., /opt/hudi or gs://bucket/path)
+        #[arg(long)]
+        hudi_base: String,
+    },
+    /// Render benchmark SQL (table registrations + query iterations)
+    RenderBenchSql {
+        /// TPC-H scale factor (loads config/sf{N}.yaml)
+        #[arg(long)]
+        scale_factor: f64,
+
+        /// Hudi tables base path (e.g., /opt/hudi)
+        #[arg(long)]
+        hudi_base: String,
+
+        /// Comma-separated query numbers (default: all 22)
+        #[arg(long)]
+        queries: Option<String>,
+
+        /// Number of iterations per query (overrides config)
+        #[arg(long)]
+        iterations: Option<usize>,
+    },
+    /// Output spark-submit arguments from scale-factor config (one per line)
+    SparkArgs {
+        /// TPC-H scale factor (loads config/sf{N}.yaml)
+        #[arg(long)]
+        scale_factor: f64,
+
+        /// Command profile to use: "create-tables" or "bench"
+        #[arg(long)]
+        command: String,
+    },
+    /// Print bench defaults from config (warmup and iterations)
+    BenchDefaults {
+        /// TPC-H scale factor (loads config/sf{N}.yaml)
+        #[arg(long, default_value_t = 1.0)]
+        scale_factor: f64,
+    },
+    /// Run TPC-H benchmark queries via DataFusion (Hudi, Parquet, or both)
+    Bench {
+        /// Hudi tables location (local path or cloud URL)
+        #[arg(long)]
+        hudi_dir: Option<String>,
+
+        /// Parquet tables location (local path or cloud URL)
+        #[arg(long)]
+        parquet_dir: Option<String>,
+
+        /// TPC-H scale factor (used for query parameter substitution and 
config loading)
+        #[arg(long, default_value_t = 1.0)]
+        scale_factor: f64,
+
+        /// Comma-separated query numbers to run (e.g., "1,3,6"); defaults to 
all 22
+        #[arg(long)]
+        queries: Option<String>,
+
+        /// Number of measured iterations per query (overrides config)
+        #[arg(long)]
+        iterations: Option<usize>,
+
+        /// Number of unmeasured warmup iterations per query (overrides config)
+        #[arg(long)]
+        warmup: Option<usize>,
+
+        /// DataFusion memory limit (e.g., "3g", "512m"); unlimited if not set
+        #[arg(long)]
+        memory_limit: Option<String>,
+
+        /// Directory to persist results as JSON (enables result saving)
+        #[arg(long)]
+        output_dir: Option<String>,
+
+        /// Engine label for persisted results (e.g., "datafusion")
+        #[arg(long)]
+        engine_label: Option<String>,
+
+        /// Format label for persisted results (e.g., "hudi"); auto-detected 
if omitted
+        #[arg(long)]
+        format_label: Option<String>,
+
+        /// Display name for charts (e.g., "datafusion+hudi-rs"); defaults to 
engine_label
+        #[arg(long)]
+        display_name: Option<String>,
+    },
+    /// Validate Hudi query results against Parquet (runs each query once, 
compares output)
+    Validate {
+        /// Hudi tables location (local path or cloud URL)
+        #[arg(long)]
+        hudi_dir: String,
+
+        /// Parquet tables location (local path or cloud URL)
+        #[arg(long)]
+        parquet_dir: String,
+
+        /// TPC-H scale factor (used for query parameter substitution)
+        #[arg(long, default_value_t = 1.0)]
+        scale_factor: f64,
+
+        /// Comma-separated query numbers to run (e.g., "1,3,6"); defaults to 
all 22
+        #[arg(long)]
+        queries: Option<String>,
+
+        /// DataFusion memory limit (e.g., "3g", "512m"); unlimited if not set
+        #[arg(long)]
+        memory_limit: Option<String>,
+    },
+    /// Parse Spark benchmark JSON output into a timing table
+    ParseSparkOutput {
+        /// Input file (reads from stdin if omitted)
+        #[arg(long)]
+        input: Option<String>,
+
+        /// Directory to persist results as JSON
+        #[arg(long)]
+        output_dir: Option<String>,
+
+        /// Engine label for persisted results (default: "spark")
+        #[arg(long)]
+        engine_label: Option<String>,
+
+        /// Format label for persisted results (e.g., "hudi")
+        #[arg(long)]
+        format_label: Option<String>,
+
+        /// Display name for charts (e.g., "spark+hudi"); defaults to 
engine_label
+        #[arg(long)]
+        display_name: Option<String>,
+
+        /// TPC-H scale factor (used for result file naming)
+        #[arg(long, default_value_t = 1.0)]
+        scale_factor: f64,
+    },
+    /// Compare persisted benchmark results with terminal bar charts
+    Compare {
+        /// Directory containing result JSON files
+        #[arg(long)]
+        results_dir: String,
+
+        /// Comma-separated result file stems (e.g., 
"datafusion_hudi_sf1,spark_hudi_sf1")
+        #[arg(long)]
+        runs: String,
+    },
+}
+
+/// Check if a path string is a cloud URL.
+fn is_cloud_url(path: &str) -> bool {
+    CLOUD_SCHEMES.iter().any(|s| path.starts_with(s))
+}
+
+/// Resolve a local path to an absolute path string, or return cloud URL as-is.
+fn resolve_path(path: &str) -> std::result::Result<String, String> {
+    if is_cloud_url(path) {
+        Ok(path.to_string())
+    } else {
+        fs::canonicalize(path)
+            .map(|p| p.to_string_lossy().to_string())
+            .map_err(|e| format!("Failed to resolve path {path}: {e}"))
+    }
+}
+
+/// Collect cloud storage env vars as options for object_store.
+fn collect_cloud_env_vars() -> Vec<(String, String)> {
+    std::env::vars()
+        .filter(|(k, _)| {
+            k.starts_with("AWS_")
+                || k.starts_with("GOOGLE_")
+                || k.starts_with("AZURE_")
+                || k.starts_with("OBJECT_STORE_")
+        })
+        .collect()
+}
+
+/// Parse a memory size string (e.g., "3g", "512m", "1024k") into bytes.
+fn parse_memory_size(s: &str) -> std::result::Result<usize, String> {
+    let s = s.trim().to_lowercase();
+    let (num_str, multiplier) = if let Some(n) = s.strip_suffix('g') {
+        (n, 1024 * 1024 * 1024)
+    } else if let Some(n) = s.strip_suffix('m') {
+        (n, 1024 * 1024)
+    } else if let Some(n) = s.strip_suffix('k') {
+        (n, 1024)
+    } else {
+        (s.as_str(), 1usize)
+    };
+    let num: f64 = num_str
+        .parse()
+        .map_err(|_| format!("Invalid memory size: {s}"))?;
+    Ok((num * multiplier as f64) as usize)
+}
+
+/// Create a SessionContext, optionally bounded by a memory pool.
+fn create_session_context(
+    memory_limit: Option<&str>,
+) -> std::result::Result<SessionContext, String> {
+    match memory_limit {
+        Some(limit) => {
+            let pool_size = parse_memory_size(limit)?;
+            let pool = FairSpillPool::new(pool_size);
+            let runtime = RuntimeEnvBuilder::default()
+                .with_memory_pool(Arc::new(pool))
+                .build_arc()
+                .map_err(|e| format!("Failed to build runtime: {e}"))?;
+            Ok(SessionContext::new_with_config_rt(
+                SessionConfig::new(),
+                runtime,
+            ))
+        }
+        None => Ok(SessionContext::new()),
+    }
+}
+
+/// Register a cloud object store on the SessionContext's RuntimeEnv.
+fn register_cloud_store(ctx: &SessionContext, base_url: &str) -> Result<()> {
+    let url = url::Url::parse(base_url).map_err(|e| {
+        datafusion::error::DataFusionError::Plan(format!("Invalid URL 
{base_url}: {e}"))
+    })?;
+    let cloud_opts: HashMap<String, String> = 
collect_cloud_env_vars().into_iter().collect();
+    let (store, _) = object_store::parse_url_opts(&url, 
&cloud_opts).map_err(|e| {
+        datafusion::error::DataFusionError::Plan(format!(
+            "Failed to create object store for {base_url}: {e}"
+        ))
+    })?;
+    ctx.runtime_env()
+        .register_object_store(&url, Arc::new(store));
+    Ok(())
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    env_logger::init();
+    let cli = Cli::parse();
+
+    match cli.command {
+        Commands::Generate {
+            scale_factor,
+            output_dir,
+        } => {
+            let dir = output_dir.unwrap_or_else(|| {
+                let sf_label = if scale_factor == scale_factor.floor() && 
scale_factor >= 1.0 {
+                    format!("sf{}", scale_factor as u64)
+                } else {
+                    format!("sf{scale_factor}")
+                };
+                let default = Path::new(env!("CARGO_MANIFEST_DIR"))
+                    .join("data")
+                    .join(format!("{sf_label}-parquet"));
+                default.to_string_lossy().to_string()
+            });
+            if !is_cloud_url(&dir) {
+                std::fs::create_dir_all(&dir).map_err(|e| {
+                    datafusion::error::DataFusionError::Plan(format!(
+                        "Failed to create output dir {dir}: {e}"
+                    ))
+                })?;
+            }
+            datagen::run_generate(scale_factor, &dir)
+                .await
+                .map_err(|e| {
+                    
datafusion::error::DataFusionError::Plan(format!("Generation failed: {e}"))
+                })
+        }
+        Commands::RenderCtas {
+            scale_factor,
+            parquet_base,
+            hudi_base,
+        } => {
+            let cfg = config::ScaleFactorConfig::load(scale_factor)
+                .map_err(|e| 
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+            print!("{}", cfg.render_ctas_sql(&parquet_base, &hudi_base));
+            Ok(())
+        }
+        Commands::RenderBenchSql {
+            scale_factor,
+            hudi_base,
+            queries,
+            iterations,
+        } => {
+            let cfg = config::ScaleFactorConfig::load(scale_factor)
+                .map_err(|e| 
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+            let iterations = iterations.unwrap_or(cfg.bench.iterations);
+            let query_nums = parse_query_numbers(queries);
+            let sql = cfg
+                .render_bench_sql(&hudi_base, &query_nums, iterations, 
scale_factor)
+                .map_err(|e| 
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+            print!("{sql}");
+            Ok(())
+        }
+        Commands::SparkArgs {
+            scale_factor,
+            command,
+        } => {
+            let cfg = config::ScaleFactorConfig::load(scale_factor)
+                .map_err(|e| 
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+            let args = cfg
+                .render_spark_args(&command)
+                .map_err(datafusion::error::DataFusionError::Plan)?;
+            for arg in args {
+                println!("{arg}");
+            }
+            Ok(())
+        }
+        Commands::BenchDefaults { scale_factor } => {
+            let cfg = config::ScaleFactorConfig::load(scale_factor)
+                .map_err(|e| 
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+            println!("{} {}", cfg.bench.warmup, cfg.bench.iterations);
+            Ok(())
+        }
+        Commands::Bench {
+            hudi_dir,
+            parquet_dir,
+            scale_factor,
+            queries,
+            iterations,
+            warmup,
+            memory_limit,
+            output_dir,
+            engine_label,
+            format_label,
+            display_name,
+        } => {
+            let cfg = config::ScaleFactorConfig::load(scale_factor)
+                .map_err(|e| 
datafusion::error::DataFusionError::Plan(format!("{e}")))?;
+            let warmup = warmup.unwrap_or(cfg.bench.warmup);
+            let iterations = iterations.unwrap_or(cfg.bench.iterations);
+            let memory_limit = memory_limit.or(cfg.bench.memory_limit);
+            run_bench(
+                hudi_dir.as_deref(),
+                parquet_dir.as_deref(),
+                scale_factor,
+                queries,
+                warmup,
+                iterations,
+                memory_limit.as_deref(),
+                output_dir.as_deref(),
+                engine_label.as_deref(),
+                format_label.as_deref(),
+                display_name.as_deref(),
+            )
+            .await
+        }
+        Commands::Validate {
+            hudi_dir,
+            parquet_dir,
+            scale_factor,
+            queries,
+            memory_limit,
+        } => {
+            run_validate(
+                &hudi_dir,
+                &parquet_dir,
+                scale_factor,
+                queries,
+                memory_limit.as_deref(),
+            )
+            .await
+        }
+        Commands::ParseSparkOutput {
+            input,
+            output_dir,
+            engine_label,
+            format_label,
+            display_name,
+            scale_factor,
+        } => run_parse_spark_output(
+            input.as_deref(),
+            output_dir.as_deref(),
+            engine_label.as_deref(),
+            format_label.as_deref(),
+            display_name.as_deref(),
+            scale_factor,
+        ),
+        Commands::Compare { results_dir, runs } => run_compare(&results_dir, 
&runs),
+    }
+}
+
+/// Parse query numbers from the user-provided comma-separated string, or 
return all 22.
+fn parse_query_numbers(queries: Option<String>) -> Vec<usize> {
+    match queries {
+        Some(s) => s
+            .split(',')
+            .filter_map(|q| q.trim().parse::<usize>().ok())
+            .filter(|&q| (1..=NUM_QUERIES).contains(&q))
+            .collect(),
+        None => (1..=NUM_QUERIES).collect(),
+    }
+}
+
+/// Load a SQL query file, applying scale-factor-dependent substitutions.
+fn load_query(query_num: usize, scale_factor: f64) -> 
std::result::Result<String, String> {
+    let cache_dir = std::env::var("TPCH_QUERY_DIR")
+        .map(std::path::PathBuf::from)
+        .unwrap_or_else(|_| 
Path::new(env!("CARGO_MANIFEST_DIR")).join("queries"));
+    let file_name = format!("q{query_num}.sql");
+    let path = cache_dir.join(&file_name);
+    let sql = fs::read_to_string(&path).map_err(|e| format!("Failed to read 
{file_name}: {e}"))?;
+    let q11_fraction = format!("{:.10}", 0.0001 / scale_factor);
+    Ok(sql.replace("${Q11_FRACTION}", &q11_fraction))
+}
+
+/// Register all 8 TPC-H Hudi tables. Supports local paths and cloud URLs.
+async fn register_hudi_tables(ctx: &SessionContext, base_dir: &str) -> 
Result<()> {
+    let resolved = 
resolve_path(base_dir).map_err(datafusion::error::DataFusionError::Plan)?;
+
+    for table_name in TPCH_TABLES {
+        let table_uri = if is_cloud_url(&resolved) {
+            format!("{}/{table_name}", resolved.trim_end_matches('/'))
+        } else {
+            let table_path = Path::new(&resolved).join(table_name);
+            url::Url::from_file_path(&table_path)
+                .map_err(|_| {
+                    datafusion::error::DataFusionError::Plan(format!(
+                        "Failed to create file URL for {}",
+                        table_path.display()
+                    ))
+                })?
+                .to_string()
+        };
+        let hudi = HudiDataSource::new(&table_uri).await?;
+        ctx.register_table(*table_name, Arc::new(hudi))?;
+    }
+    Ok(())
+}
+
+/// Register all 8 TPC-H parquet tables. Supports local paths and cloud URLs.
+async fn register_parquet_tables(ctx: &SessionContext, base_dir: &str) -> 
Result<()> {
+    let resolved = 
resolve_path(base_dir).map_err(datafusion::error::DataFusionError::Plan)?;
+
+    if is_cloud_url(&resolved) {
+        register_cloud_store(ctx, &resolved)?;
+    }
+
+    for table_name in TPCH_TABLES {
+        let table_path = if is_cloud_url(&resolved) {
+            format!("{}/{table_name}", resolved.trim_end_matches('/'))
+        } else {
+            Path::new(&resolved)
+                .join(table_name)
+                .to_string_lossy()
+                .to_string()
+        };
+        ctx.register_parquet(*table_name, &table_path, Default::default())
+            .await?;
+    }
+    Ok(())
+}
+
+/// Collect a DataFrame into a Vec of RecordBatches.
+async fn collect_results(df: DataFrame) -> Result<Vec<RecordBatch>> {
+    df.collect().await
+}
+
+/// Benchmark a single source (hudi or parquet) and return per-query timings 
and last batches.
+async fn bench_source(
+    ctx: &SessionContext,
+    query_nums: &[usize],
+    warmup: usize,
+    iterations: usize,
+    scale_factor: f64,
+) -> Vec<QueryResult> {
+    let total_runs = warmup + iterations;
+    let mut results = Vec::new();
+
+    for query_num in query_nums {
+        let sql = match load_query(*query_num, scale_factor) {
+            Ok(s) => s,
+            Err(e) => {
+                results.push(QueryResult {
+                    query_num: *query_num,
+                    timings_ms: vec![],
+                    last_batches: vec![],
+                    error: Some(e),
+                });
+                continue;
+            }
+        };
+
+        let mut timings_ms: Vec<f64> = Vec::with_capacity(iterations);
+        let mut last_batches: Vec<RecordBatch> = Vec::new();
+        let mut error = None;
+
+        // Strip SQL comment lines before splitting, so semicolons inside
+        // comments (e.g., license headers) don't produce spurious empty 
statements.
+        let sql_no_comments: String = sql
+            .lines()
+            .filter(|line| !line.trim_start().starts_with("--"))
+            .collect::<Vec<_>>()
+            .join("\n");
+
+        // Split multi-statement queries (e.g., Q15: CREATE VIEW; SELECT; DROP 
VIEW)
+        let statements: Vec<&str> = sql_no_comments
+            .split(';')
+            .map(|s| s.trim())
+            .filter(|s| !s.is_empty())
+            .collect();
+
+        for i in 0..total_runs {
+            if i < warmup {
+                print!("  Q{:02} warmup {}/{}...", query_num, i + 1, warmup);
+            } else {
+                print!(
+                    "  Q{:02} iter {}/{}...",
+                    query_num,
+                    i - warmup + 1,
+                    iterations
+                );
+            }
+
+            let start = Instant::now();
+            let mut iter_error = None;
+            let mut iter_batches = Vec::new();
+
+            for stmt in &statements {
+                match ctx.sql(stmt).await {
+                    Ok(df) => match collect_results(df).await {
+                        Ok(batches) => {
+                            if !batches.is_empty() {
+                                iter_batches = batches;
+                            }
+                        }
+                        Err(e) => {
+                            iter_error = Some(format!("{e}"));
+                            break;
+                        }
+                    },
+                    Err(e) => {
+                        iter_error = Some(format!("{e}"));
+                        break;
+                    }
+                }
+            }
+
+            let elapsed = start.elapsed().as_secs_f64() * 1000.0;
+
+            if let Some(e) = iter_error {
+                println!(" ERROR");
+                error = Some(e);
+                break;
+            }
+
+            println!(" {elapsed:.1}ms");
+
+            if i >= warmup {
+                timings_ms.push(elapsed);
+            }
+            if i == total_runs - 1 {
+                last_batches = iter_batches;
+            }
+        }
+
+        results.push(QueryResult {
+            query_num: *query_num,
+            timings_ms,
+            last_batches,
+            error,
+        });
+    }
+
+    results
+}
+
+struct QueryResult {
+    query_num: usize,
+    timings_ms: Vec<f64>,
+    last_batches: Vec<RecordBatch>,
+    error: Option<String>,
+}
+
+struct TimingStats {
+    min: f64,
+    median: f64,
+    mean: f64,
+    max: f64,
+}
+
+fn compute_stats(timings: &[f64]) -> Option<TimingStats> {
+    if timings.is_empty() {
+        return None;
+    }
+    let mut sorted = timings.to_vec();
+    sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
+    let min = sorted[0];
+    let max = sorted[sorted.len() - 1];
+    let mean = sorted.iter().sum::<f64>() / sorted.len() as f64;
+    let median = if sorted.len() % 2 == 0 {
+        let mid = sorted.len() / 2;
+        (sorted[mid - 1] + sorted[mid]) / 2.0
+    } else {
+        sorted[sorted.len() / 2]
+    };
+    Some(TimingStats {
+        min,
+        median,
+        mean,
+        max,
+    })
+}
+
+#[derive(Serialize, Deserialize)]
+struct PersistedQueryStats {
+    avg_ms: f64,
+    min_ms: f64,
+    median_ms: f64,
+    max_ms: f64,
+}
+
+#[derive(Serialize, Deserialize)]
+struct PersistedResults {
+    engine: String,
+    #[serde(default)]
+    display_name: Option<String>,
+    format: String,
+    scale_factor: f64,
+    timestamp: u64,
+    queries: BTreeMap<String, PersistedQueryStats>,
+}
+
+impl PersistedResults {
+    fn label(&self) -> &str {
+        self.display_name.as_deref().unwrap_or(&self.engine)
+    }
+}
+
+fn format_sf_label(sf: f64) -> String {
+    if sf == sf.floor() && sf >= 1.0 {
+        format!("sf{}", sf as u64)
+    } else {
+        format!("sf{sf}")
+    }
+}
+
+fn save_results(
+    results: &[QueryResult],
+    engine: &str,
+    display_name: Option<&str>,
+    format_name: &str,
+    scale_factor: f64,
+    output_dir: &str,
+) -> std::result::Result<(), String> {
+    let mut queries = BTreeMap::new();
+    for r in results {
+        if r.error.is_some() {
+            continue;
+        }
+        if let Some(stats) = compute_stats(&r.timings_ms) {
+            queries.insert(
+                r.query_num.to_string(),
+                PersistedQueryStats {
+                    avg_ms: stats.mean,
+                    min_ms: stats.min,
+                    median_ms: stats.median,
+                    max_ms: stats.max,
+                },
+            );
+        }
+    }
+
+    let timestamp = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .unwrap_or_default()
+        .as_secs();
+
+    let persisted = PersistedResults {
+        engine: engine.to_string(),
+        display_name: display_name.map(|s| s.to_string()),
+        format: format_name.to_string(),
+        scale_factor,
+        timestamp,
+        queries,
+    };
+
+    fs::create_dir_all(output_dir)
+        .map_err(|e| format!("Failed to create output dir {output_dir}: 
{e}"))?;
+
+    let sf_label = format_sf_label(scale_factor);
+    let filename = format!("{engine}_{format_name}_{sf_label}.json");
+    let path = Path::new(output_dir).join(&filename);
+
+    let json = serde_json::to_string_pretty(&persisted)
+        .map_err(|e| format!("Failed to serialize results: {e}"))?;
+    fs::write(&path, json).map_err(|e| format!("Failed to write {}: {e}", 
path.display()))?;
+
+    println!("Results saved to {}", path.display());
+    Ok(())
+}
+
+fn load_results(path: &str) -> std::result::Result<PersistedResults, String> {
+    let content = fs::read_to_string(path).map_err(|e| format!("Failed to read 
{path}: {e}"))?;
+    serde_json::from_str(&content).map_err(|e| format!("Failed to parse 
{path}: {e}"))
+}
+
+/// Run the benchmark against Hudi, Parquet, or both.
+#[allow(clippy::too_many_arguments)]
+async fn run_bench(
+    hudi_dir: Option<&str>,
+    parquet_dir: Option<&str>,
+    scale_factor: f64,
+    queries: Option<String>,
+    warmup: usize,
+    iterations: usize,
+    memory_limit: Option<&str>,
+    output_dir: Option<&str>,
+    engine_label: Option<&str>,
+    format_label: Option<&str>,
+    display_name: Option<&str>,
+) -> Result<()> {
+    if hudi_dir.is_none() && parquet_dir.is_none() {
+        return Err(datafusion::error::DataFusionError::Plan(
+            "At least one of --hudi-dir or --parquet-dir must be 
provided".to_string(),
+        ));
+    }
+
+    let query_nums = parse_query_numbers(queries);
+
+    if let Some(limit) = memory_limit {
+        println!("DataFusion memory limit: {limit}");
+    }
+    println!("Warmup: {warmup} iteration(s), Measured: {iterations} 
iteration(s)");
+
+    if let Some(hudi_dir) = hudi_dir {
+        let ctx = create_session_context(memory_limit)
+            .map_err(datafusion::error::DataFusionError::Plan)?;
+        println!("Registering Hudi tables from {hudi_dir}");
+        register_hudi_tables(&ctx, hudi_dir).await?;
+        println!("Benchmarking Hudi...");
+        let results = bench_source(&ctx, &query_nums, warmup, iterations, 
scale_factor).await;
+        print_single_table("Hudi", &results);
+        if let Some(dir) = output_dir {
+            let engine = engine_label.unwrap_or("datafusion");
+            let fmt = format_label.unwrap_or("hudi");
+            save_results(&results, engine, display_name, fmt, scale_factor, 
dir)
+                .map_err(datafusion::error::DataFusionError::Plan)?;
+        }
+    }
+
+    if let Some(parquet_dir) = parquet_dir {
+        let ctx = create_session_context(memory_limit)
+            .map_err(datafusion::error::DataFusionError::Plan)?;
+        println!("Registering Parquet tables from {parquet_dir}");
+        register_parquet_tables(&ctx, parquet_dir).await?;
+        println!("Benchmarking Parquet...");
+        let results = bench_source(&ctx, &query_nums, warmup, iterations, 
scale_factor).await;
+        print_single_table("Parquet", &results);
+        if let Some(dir) = output_dir {
+            let engine = engine_label.unwrap_or("datafusion");
+            let fmt = format_label.unwrap_or("parquet");
+            save_results(&results, engine, display_name, fmt, scale_factor, 
dir)
+                .map_err(datafusion::error::DataFusionError::Plan)?;
+        }
+    }
+
+    Ok(())
+}
+
+/// Run validation: query both Hudi and Parquet once, compare results.
+async fn run_validate(
+    hudi_dir: &str,
+    parquet_dir: &str,
+    scale_factor: f64,
+    queries: Option<String>,
+    memory_limit: Option<&str>,
+) -> Result<()> {
+    let query_nums = parse_query_numbers(queries);
+
+    if let Some(limit) = memory_limit {
+        println!("DataFusion memory limit: {limit}");
+    }
+
+    println!("Registering Hudi tables from {hudi_dir}");
+    let hudi_ctx =
+        
create_session_context(memory_limit).map_err(datafusion::error::DataFusionError::Plan)?;
+    register_hudi_tables(&hudi_ctx, hudi_dir).await?;
+
+    println!("Registering Parquet tables from {parquet_dir}");
+    let parquet_ctx =
+        
create_session_context(memory_limit).map_err(datafusion::error::DataFusionError::Plan)?;
+    register_parquet_tables(&parquet_ctx, parquet_dir).await?;
+
+    println!("Running Hudi queries...");
+    let hudi_results = bench_source(&hudi_ctx, &query_nums, 0, 1, 
scale_factor).await;
+
+    println!("Running Parquet queries...");
+    let parquet_results = bench_source(&parquet_ctx, &query_nums, 0, 1, 
scale_factor).await;
+
+    print_validation_table(&query_nums, &hudi_results, &parquet_results);
+
+    Ok(())
+}
+
+/// Parse Spark benchmark JSON output into a timing table.
+fn run_parse_spark_output(
+    input: Option<&str>,
+    output_dir: Option<&str>,
+    engine_label: Option<&str>,
+    format_label: Option<&str>,
+    display_name: Option<&str>,
+    scale_factor: f64,
+) -> Result<()> {
+    let reader: Box<dyn BufRead> = match input {
+        Some(path) => {
+            let file = fs::File::open(path).map_err(|e| {
+                datafusion::error::DataFusionError::Plan(format!("Failed to 
open {path}: {e}"))
+            })?;
+            Box::new(std::io::BufReader::new(file))
+        }
+        None => Box::new(std::io::BufReader::new(std::io::stdin())),
+    };
+
+    let results = parse_spark_timings(reader);
+    if results.is_empty() {
+        println!("No benchmark data found in input.");
+    } else {
+        print_single_table("Spark", &results);
+        if let Some(dir) = output_dir {
+            let engine = engine_label.unwrap_or("spark");
+            let fmt = format_label.unwrap_or("hudi");
+            save_results(&results, engine, display_name, fmt, scale_factor, 
dir)
+                .map_err(datafusion::error::DataFusionError::Plan)?;
+        }
+    }
+    Ok(())
+}
+
+/// Parse JSON lines from the PySpark bench script.
+///
+/// Each line is: {"query": N, "elapsed_ms": X.X}
+/// Warmup iterations are already excluded by the Python script.
+fn parse_spark_timings(reader: Box<dyn BufRead>) -> Vec<QueryResult> {
+    let mut all_timings: BTreeMap<usize, Vec<f64>> = BTreeMap::new();
+
+    for line in reader.lines().map_while(|l| l.ok()) {
+        if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line) {
+            if let (Some(q), Some(ms)) = (v["query"].as_u64(), 
v["elapsed_ms"].as_f64()) {
+                all_timings.entry(q as usize).or_default().push(ms);
+            }
+        }
+    }
+
+    all_timings
+        .into_iter()
+        .map(|(q, times)| QueryResult {
+            query_num: q,
+            timings_ms: times,
+            last_batches: vec![],
+            error: None,
+        })
+        .collect()
+}
+
+/// Compare persisted benchmark results and render terminal bar charts.
+fn run_compare(results_dir: &str, runs: &str) -> Result<()> {
+    let stems: Vec<&str> = runs.split(',').map(|s| s.trim()).collect();
+    if stems.is_empty() {
+        return Err(datafusion::error::DataFusionError::Plan(
+            "No runs specified".to_string(),
+        ));
+    }
+
+    let mut loaded: Vec<PersistedResults> = Vec::new();
+    for stem in &stems {
+        let path = format!("{results_dir}/{stem}.json");
+        let r = 
load_results(&path).map_err(datafusion::error::DataFusionError::Plan)?;
+        loaded.push(r);
+    }
+
+    // Collect all query numbers across all runs
+    let mut all_queries = BTreeSet::new();
+    for r in &loaded {
+        for q in r.queries.keys() {
+            if let Ok(n) = q.parse::<usize>() {
+                all_queries.insert(n);
+            }
+        }
+    }
+
+    if all_queries.is_empty() {
+        println!("No query data found in the provided result files.");
+        return Ok(());
+    }
+
+    // Find global max avg_ms for bar scaling
+    let global_max = loaded
+        .iter()
+        .flat_map(|r| r.queries.values().map(|s| s.avg_ms))
+        .fold(0.0_f64, f64::max);
+
+    if global_max == 0.0 {
+        println!("All query timings are zero.");
+        return Ok(());
+    }
+
+    let bar_width: usize = 40;
+    let engine_names: Vec<&str> = loaded.iter().map(|r| r.label()).collect();
+    let max_name_len = engine_names.iter().map(|n| n.len()).max().unwrap_or(0);
+
+    println!();
+    println!("TPC-H Query Runtime Comparison");
+    println!("{}", "=".repeat(max_name_len + 6 + bar_width + 16));
+    println!();
+
+    for q in &all_queries {
+        let q_str = q.to_string();
+        for (i, r) in loaded.iter().enumerate() {
+            let label = if i == 0 {
+                format!("Q{q:02}  {:<width$}", r.label(), width = max_name_len)
+            } else {
+                format!("     {:<width$}", r.label(), width = max_name_len)
+            };
+
+            if let Some(stats) = r.queries.get(&q_str) {
+                let filled = ((stats.avg_ms / global_max) * bar_width as 
f64).round() as usize;
+                let filled = filled.min(bar_width);
+                let empty = bar_width - filled;
+                println!(
+                    "{label} |{}{} | {:>9.1} ms",
+                    "\u{2588}".repeat(filled),
+                    " ".repeat(empty),
+                    stats.avg_ms,
+                );
+            } else {
+                println!("{label} |{} |       N/A", " ".repeat(bar_width),);
+            }
+        }
+        println!();
+    }
+
+    // Summary: Total and Geometric Mean as bar charts
+    let mut totals: Vec<(String, f64)> = Vec::new();
+    let mut geomeans: Vec<(String, f64)> = Vec::new();
+    for r in &loaded {
+        let total: f64 = r.queries.values().map(|s| s.avg_ms).sum();
+        totals.push((r.label().to_string(), total));
+
+        let values: Vec<f64> = r.queries.values().map(|s| s.avg_ms).collect();
+        if !values.is_empty() && values.iter().all(|v| *v > 0.0) {
+            let log_sum: f64 = values.iter().map(|v| v.ln()).sum::<f64>();
+            let geomean = (log_sum / values.len() as f64).exp();
+            geomeans.push((r.label().to_string(), geomean));
+        }
+    }
+
+    println!("Summary");
+    println!("{}", "-".repeat(max_name_len + 6 + bar_width + 16));
+    println!();
+
+    // Total runtime bars
+    let total_max = totals.iter().map(|(_, v)| *v).fold(0.0_f64, f64::max);
+    if total_max > 0.0 {
+        for (i, (engine, total)) in totals.iter().enumerate() {
+            let label = if i == 0 {
+                format!("Tot  {engine:<max_name_len$}")
+            } else {
+                format!("     {engine:<max_name_len$}")
+            };
+            let filled = ((total / total_max) * bar_width as f64).round() as 
usize;
+            let filled = filled.min(bar_width);
+            let empty = bar_width - filled;
+            println!(
+                "{label} |{}{} | {:>9.1} ms",
+                "\u{2588}".repeat(filled),
+                " ".repeat(empty),
+                total,
+            );
+        }
+        println!();
+    }
+
+    // Geometric mean bars
+    if !geomeans.is_empty() {
+        let geomean_max = geomeans.iter().map(|(_, v)| *v).fold(0.0_f64, 
f64::max);
+        if geomean_max > 0.0 {
+            for (i, (engine, geomean)) in geomeans.iter().enumerate() {
+                let label = if i == 0 {
+                    format!("Geo  {engine:<max_name_len$}")
+                } else {
+                    format!("     {engine:<max_name_len$}")
+                };
+                let filled = ((geomean / geomean_max) * bar_width as 
f64).round() as usize;
+                let filled = filled.min(bar_width);
+                let empty = bar_width - filled;
+                println!(
+                    "{label} |{}{} | {:>9.1} ms",
+                    "\u{2588}".repeat(filled),
+                    " ".repeat(empty),
+                    geomean,
+                );
+            }
+            println!();
+        }
+    }
+
+    Ok(())
+}
+
+fn print_single_table(label: &str, results: &[QueryResult]) {
+    let mut table = Table::new();
+    table.set_header(vec![
+        Cell::new("Query"),
+        Cell::new(format!("{label} Min (ms)")),
+        Cell::new(format!("{label} Median (ms)")),
+        Cell::new(format!("{label} Mean (ms)")),
+        Cell::new(format!("{label} Max (ms)")),
+        Cell::new("Status"),
+    ]);
+
+    for r in results {
+        if let Some(ref e) = r.error {
+            table.add_row(vec![
+                Cell::new(format!("Q{:02}", r.query_num)),
+                Cell::new("-"),
+                Cell::new("-"),
+                Cell::new("-"),
+                Cell::new("-"),
+                Cell::new(format!("ERROR: {e}")),
+            ]);
+        } else if let Some(stats) = compute_stats(&r.timings_ms) {
+            table.add_row(vec![
+                Cell::new(format!("Q{:02}", r.query_num)),
+                Cell::new(format!("{:.1}", stats.min)),
+                Cell::new(format!("{:.1}", stats.median)),
+                Cell::new(format!("{:.1}", stats.mean)),
+                Cell::new(format!("{:.1}", stats.max)),
+                Cell::new("OK"),
+            ]);
+        }
+    }
+
+    println!("{table}");
+}
+
+fn print_validation_table(
+    query_nums: &[usize],
+    hudi_results: &[QueryResult],
+    parquet_results: &[QueryResult],
+) {
+    let mut table = Table::new();
+    table.set_header(vec![
+        Cell::new("Query"),
+        Cell::new("Hudi (ms)"),
+        Cell::new("Parquet (ms)"),
+        Cell::new("Result"),
+    ]);
+
+    for (i, qn) in query_nums.iter().enumerate() {
+        let hr = &hudi_results[i];
+        let pr = &parquet_results[i];
+
+        let h_err = hr.error.as_deref();
+        let p_err = pr.error.as_deref();
+
+        if h_err.is_some() || p_err.is_some() {
+            let err_msg = h_err.or(p_err).unwrap_or("unknown error");
+            table.add_row(vec![
+                Cell::new(format!("Q{qn:02}")),
+                Cell::new(if h_err.is_some() { "-" } else { "OK" }),
+                Cell::new(if p_err.is_some() { "-" } else { "OK" }),
+                Cell::new(format!("ERROR: {err_msg}")),
+            ]);
+            continue;
+        }
+
+        let h_ms = hr
+            .timings_ms
+            .first()
+            .map(|t| format!("{t:.1}"))
+            .unwrap_or("-".into());
+        let p_ms = pr
+            .timings_ms
+            .first()
+            .map(|t| format!("{t:.1}"))
+            .unwrap_or("-".into());
+        let validation = compare_batches(&hr.last_batches, &pr.last_batches);
+
+        table.add_row(vec![
+            Cell::new(format!("Q{qn:02}")),
+            Cell::new(h_ms),
+            Cell::new(p_ms),
+            Cell::new(validation),
+        ]);
+    }
+
+    println!("{table}");
+}
+
+/// Compare two sets of record batches for correctness validation.
+fn compare_batches(actual: &[RecordBatch], expected: &[RecordBatch]) -> String 
{
+    let actual_rows = match batches_to_csv_rows(actual) {
+        Ok(r) => r,
+        Err(e) => return format!("ERROR: {e}"),
+    };
+    let expected_rows = match batches_to_csv_rows(expected) {
+        Ok(r) => r,
+        Err(e) => return format!("ERROR: {e}"),
+    };
+
+    if actual_rows.len() != expected_rows.len() {
+        return format!(
+            "FAIL (rows: {} vs {})",
+            actual_rows.len(),
+            expected_rows.len()
+        );
+    }
+
+    let mut actual_sorted = actual_rows;
+    actual_sorted.sort();
+    let mut expected_sorted = expected_rows;
+    expected_sorted.sort();
+
+    for (i, (a, e)) in 
actual_sorted.iter().zip(expected_sorted.iter()).enumerate() {
+        if !rows_match(a, e) {
+            return format!("FAIL (row {i} mismatch)");
+        }
+    }
+
+    "PASS".to_string()
+}
+
+/// Compare two CSV row strings, using tolerance for floating-point values.
+fn rows_match(actual: &str, expected: &str) -> bool {
+    let actual_cols: Vec<&str> = actual.split(',').collect();
+    let expected_cols: Vec<&str> = expected.split(',').collect();
+
+    if actual_cols.len() != expected_cols.len() {
+        return false;
+    }
+
+    for (a, e) in actual_cols.iter().zip(expected_cols.iter()) {
+        if a == e {
+            continue;
+        }
+        match (a.parse::<f64>(), e.parse::<f64>()) {
+            (Ok(av), Ok(ev)) => {
+                let diff = (av - ev).abs();
+                let max_abs = av.abs().max(ev.abs());
+                if max_abs == 0.0 {
+                    if diff > 1e-10 {
+                        return false;
+                    }
+                } else if diff / max_abs > 1e-6 {
+                    return false;
+                }
+            }
+            _ => return false,
+        }
+    }
+
+    true
+}
+
+/// Convert record batches to CSV-like row strings for comparison.
+fn batches_to_csv_rows(batches: &[RecordBatch]) -> 
std::result::Result<Vec<String>, String> {
+    let mut rows = Vec::new();
+    let fmt_opts = FormatOptions::default();
+
+    for batch in batches {
+        let formatters: Vec<ArrayFormatter> = batch
+            .columns()
+            .iter()
+            .map(|col| ArrayFormatter::try_new(col.as_ref(), &fmt_opts))
+            .collect::<std::result::Result<Vec<_>, _>>()
+            .map_err(|e| format!("Failed to create formatter: {e}"))?;
+
+        for row_idx in 0..batch.num_rows() {
+            let cols: Vec<String> = batch
+                .schema()
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(col_idx, field)| {
+                    if batch.column(col_idx).is_null(row_idx) {
+                        return "".to_string();
+                    }
+                    match field.data_type() {
+                        DataType::Float32 | DataType::Float64 | 
DataType::Decimal128(_, _) => {
+                            formatters[col_idx].value(row_idx).to_string()
+                        }
+                        _ => formatters[col_idx].value(row_idx).to_string(),
+                    }
+                })
+                .collect();
+            rows.push(cols.join(","));
+        }
+    }
+
+    Ok(rows)
+}

Reply via email to