This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new a2a9467e9 chore: Add Iceberg TPC-H benchmarking scripts (#3294)
a2a9467e9 is described below
commit a2a9467e93aab51d30e8f226a55b8e456190f045
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 9 08:53:35 2026 -0700
chore: Add Iceberg TPC-H benchmarking scripts (#3294)
* [WIP] Add Iceberg TPC-H benchmarking scripts
Add scripts to benchmark TPC-H queries against Iceberg tables using
Comet's native iceberg-rust integration:
- create-iceberg-tpch.py: Convert Parquet TPC-H data to Iceberg tables
- tpcbench-iceberg.py: Run TPC-H queries against Iceberg catalog tables
- comet-tpch-iceberg.sh: Shell script to run the benchmark with Comet
Also updates README.md with Iceberg benchmarking documentation.
Co-Authored-By: Claude Opus 4.5 <[email protected]>
* fix
* fix
* Consolidate Parquet and Iceberg benchmark scripts into single tpcbench.py
Merge tpcbench-iceberg.py into tpcbench.py using mutually exclusive args:
- --data for Parquet files
- --catalog/--database for Iceberg tables
Co-Authored-By: Claude Opus 4.5 <[email protected]>
* fix: address review comments on README consistency
- Use --packages instead of --jars for table creation to match
create-iceberg-tpch.py usage
- Use $ICEBERG_CATALOG variable instead of hardcoding 'local' in
spark.sql.catalog config to be consistent with comet-tpch-iceberg.sh
- Clarify that JAR download is only needed for benchmark execution
Co-Authored-By: Claude Opus 4.5 <[email protected]>
---------
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
dev/benchmarks/README.md | 76 +++++++++++++
dev/benchmarks/comet-tpch-iceberg.sh | 114 +++++++++++++++++++
dev/benchmarks/create-iceberg-tpch.py | 88 +++++++++++++++
dev/benchmarks/tpcbench.py | 206 +++++++++++++++++++++++++---------
4 files changed, 428 insertions(+), 56 deletions(-)
diff --git a/dev/benchmarks/README.md b/dev/benchmarks/README.md
index 2ef7a9a26..b3ea67419 100644
--- a/dev/benchmarks/README.md
+++ b/dev/benchmarks/README.md
@@ -73,3 +73,79 @@ Generating charts:
```shell
python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet
0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local
Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json
gluten-tpch-1752337474344.json
```
+
+## Iceberg Benchmarking
+
+Comet includes native Iceberg support via iceberg-rust integration. This
enables benchmarking TPC-H queries
+against Iceberg tables with native scan acceleration.
+
+### Prerequisites
+
+Download the Iceberg Spark runtime JAR (required for running the benchmark):
+
+```shell
+wget
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
+export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
+```
+
+Note: Table creation uses `--packages` which auto-downloads the dependency.
+
+### Create Iceberg TPC-H tables
+
+Convert existing Parquet TPC-H data to Iceberg format:
+
+```shell
+export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
+export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
+
+$SPARK_HOME/bin/spark-submit \
+ --master $SPARK_MASTER \
+ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+ --conf spark.driver.memory=8G \
+ --conf spark.executor.instances=1 \
+ --conf spark.executor.cores=8 \
+ --conf spark.cores.max=8 \
+ --conf spark.executor.memory=16g \
+ --conf
spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
+ --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
+ create-iceberg-tpch.py \
+ --parquet-path $TPCH_DATA \
+ --catalog $ICEBERG_CATALOG \
+ --database tpch
+```
+
+### Run Iceberg benchmark
+
+```shell
+export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
+export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar
+export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
+export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
+export TPCH_QUERIES=/mnt/bigdata/tpch/queries/
+sudo ./drop-caches.sh
+./comet-tpch-iceberg.sh
+```
+
+The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable
Comet's native iceberg-rust
+integration. Verify native scanning is active by checking for
`CometIcebergNativeScanExec` in the
+physical plan output.
+
+### Iceberg-specific options
+
+| Environment Variable | Default | Description |
+| -------------------- | ---------- | ----------------------------------- |
+| `ICEBERG_CATALOG` | `local` | Iceberg catalog name |
+| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables |
+| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory |
+
+### Comparing Parquet vs Iceberg performance
+
+Run both benchmarks and compare:
+
+```shell
+python3 generate-comparison.py --benchmark tpch \
+ --labels "Comet (Parquet)" "Comet (Iceberg)" \
+ --title "TPC-H @ 100 GB: Parquet vs Iceberg" \
+ comet-tpch-*.json comet-iceberg-tpch-*.json
+```
diff --git a/dev/benchmarks/comet-tpch-iceberg.sh
b/dev/benchmarks/comet-tpch-iceberg.sh
new file mode 100755
index 000000000..7907125c8
--- /dev/null
+++ b/dev/benchmarks/comet-tpch-iceberg.sh
@@ -0,0 +1,114 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust
integration.
+#
+# Required environment variables:
+# SPARK_HOME - Path to Spark installation
+# SPARK_MASTER - Spark master URL (e.g., spark://localhost:7077)
+# COMET_JAR - Path to Comet JAR
+# ICEBERG_JAR - Path to Iceberg Spark runtime JAR
+# ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory
+# TPCH_QUERIES - Path to TPC-H query files
+#
+# Optional:
+# ICEBERG_CATALOG - Catalog name (default: local)
+# ICEBERG_DATABASE - Database name (default: tpch)
+#
+# Setup (run once to create Iceberg tables from Parquet):
+# $SPARK_HOME/bin/spark-submit \
+# --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+# --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
+# --conf spark.sql.catalog.local.type=hadoop \
+# --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \
+# create-iceberg-tpch.py \
+# --parquet-path $TPCH_DATA \
+# --catalog local \
+# --database tpch
+
+set -e
+
+# Defaults
+ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
+ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch}
+
+# Validate required variables
+if [ -z "$SPARK_HOME" ]; then
+ echo "Error: SPARK_HOME is not set"
+ exit 1
+fi
+if [ -z "$COMET_JAR" ]; then
+ echo "Error: COMET_JAR is not set"
+ exit 1
+fi
+if [ -z "$ICEBERG_JAR" ]; then
+ echo "Error: ICEBERG_JAR is not set"
+ echo "Download from:
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/"
+ exit 1
+fi
+if [ -z "$ICEBERG_WAREHOUSE" ]; then
+ echo "Error: ICEBERG_WAREHOUSE is not set"
+ exit 1
+fi
+if [ -z "$TPCH_QUERIES" ]; then
+ echo "Error: TPCH_QUERIES is not set"
+ exit 1
+fi
+
+$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true
+$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true
+
+$SPARK_HOME/sbin/start-master.sh
+$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
+
+$SPARK_HOME/bin/spark-submit \
+ --master $SPARK_MASTER \
+ --jars $COMET_JAR,$ICEBERG_JAR \
+ --driver-class-path $COMET_JAR:$ICEBERG_JAR \
+ --conf spark.driver.memory=8G \
+ --conf spark.executor.instances=1 \
+ --conf spark.executor.cores=8 \
+ --conf spark.cores.max=8 \
+ --conf spark.executor.memory=16g \
+ --conf spark.memory.offHeap.enabled=true \
+ --conf spark.memory.offHeap.size=16g \
+ --conf spark.eventLog.enabled=true \
+ --conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
+ --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
+ --conf spark.plugins=org.apache.spark.CometPlugin \
+ --conf
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
\
+ --conf spark.comet.exec.replaceSortMergeJoin=true \
+ --conf spark.comet.expression.Cast.allowIncompatible=true \
+ --conf spark.comet.enabled=true \
+ --conf spark.comet.exec.enabled=true \
+ --conf spark.comet.scan.icebergNative.enabled=true \
+ --conf spark.comet.explainFallback.enabled=true \
+ --conf
spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
+ --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
+ --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \
+ tpcbench.py \
+ --name comet-iceberg \
+ --benchmark tpch \
+ --catalog $ICEBERG_CATALOG \
+ --database $ICEBERG_DATABASE \
+ --queries $TPCH_QUERIES \
+ --output . \
+ --iterations 1
diff --git a/dev/benchmarks/create-iceberg-tpch.py
b/dev/benchmarks/create-iceberg-tpch.py
new file mode 100644
index 000000000..44f0f63a2
--- /dev/null
+++ b/dev/benchmarks/create-iceberg-tpch.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Convert TPC-H Parquet data to Iceberg tables.
+
+Usage:
+ spark-submit \
+ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+ --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.local.type=hadoop \
+ --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \
+ create-iceberg-tpch.py \
+ --parquet-path /path/to/tpch/parquet \
+ --catalog local \
+ --database tpch
+"""
+
+import argparse
+from pyspark.sql import SparkSession
+import time
+
+
+def main(parquet_path: str, catalog: str, database: str):
+ spark = SparkSession.builder \
+ .appName("Create Iceberg TPC-H Tables") \
+ .getOrCreate()
+
+ table_names = [
+ "customer",
+ "lineitem",
+ "nation",
+ "orders",
+ "part",
+ "partsupp",
+ "region",
+ "supplier"
+ ]
+
+ # Create database if it doesn't exist
+ spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}")
+
+ for table in table_names:
+ parquet_table_path = f"{parquet_path}/{table}.parquet"
+ iceberg_table = f"{catalog}.{database}.{table}"
+
+ print(f"Converting {parquet_table_path} -> {iceberg_table}")
+ start_time = time.time()
+
+ # Drop table if exists to allow re-running
+ spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")
+
+ # Read parquet and write as Iceberg
+ df = spark.read.parquet(parquet_table_path)
+ df.writeTo(iceberg_table).using("iceberg").create()
+
+ row_count = spark.table(iceberg_table).count()
+ elapsed = time.time() - start_time
+ print(f" Created {iceberg_table} with {row_count} rows in
{elapsed:.2f}s")
+
+ print("\nAll TPC-H tables created successfully!")
+ print(f"Tables available at: {catalog}.{database}.*")
+
+ spark.stop()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data
to Iceberg tables")
+ parser.add_argument("--parquet-path", required=True, help="Path to TPC-H
Parquet data directory")
+ parser.add_argument("--catalog", required=True, help="Iceberg catalog name
(e.g., 'local')")
+ parser.add_argument("--database", default="tpch", help="Database name to
create tables in")
+ args = parser.parse_args()
+
+ main(args.parquet_path, args.catalog, args.database)
diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py
index 130db7a62..400ccd175 100644
--- a/dev/benchmarks/tpcbench.py
+++ b/dev/benchmarks/tpcbench.py
@@ -15,6 +15,14 @@
# specific language governing permissions and limitations
# under the License.
+"""
+TPC-H / TPC-DS benchmark runner.
+
+Supports two data sources:
+ - Files: use --data with --format (parquet, csv, json) and optional --options
+ - Iceberg tables: use --catalog and --database to specify the catalog
location
+"""
+
import argparse
from datetime import datetime
import json
@@ -22,11 +30,9 @@ from pyspark.sql import SparkSession
import time
from typing import Dict
-# rename same columns aliases
-# a, a, b, b -> a, a_1, b, b_1
-#
-# Important for writing data where column name uniqueness is required
+
def dedup_columns(df):
+ """Rename duplicate column aliases: a, a, b, b -> a, a_1, b, b_1"""
counts = {}
new_cols = []
for c in df.columns:
@@ -38,30 +44,59 @@ def dedup_columns(df):
new_cols.append(f"{c}_{counts[c]}")
return df.toDF(*new_cols)
-def main(benchmark: str, data_path: str, query_path: str, iterations: int,
output: str, name: str, format: str, query_num: int = None, write_path: str =
None, options: Dict[str, str] = None):
- # Initialize a SparkSession
+def main(
+ benchmark: str,
+ data_path: str,
+ catalog: str,
+ database: str,
+ query_path: str,
+ iterations: int,
+ output: str,
+ name: str,
+ format: str,
+ query_num: int = None,
+ write_path: str = None,
+ options: Dict[str, str] = None
+):
+ if options is None:
+ options = {}
+
spark = SparkSession.builder \
.appName(f"{name} benchmark derived from {benchmark}") \
.getOrCreate()
- # Register the tables
+ # Define tables for each benchmark
if benchmark == "tpch":
num_queries = 22
- table_names = ["customer", "lineitem", "nation", "orders", "part",
"partsupp", "region", "supplier"]
+ table_names = [
+ "customer", "lineitem", "nation", "orders",
+ "part", "partsupp", "region", "supplier"
+ ]
elif benchmark == "tpcds":
num_queries = 99
- table_names = ["call_center", "catalog_page", "catalog_returns",
"catalog_sales", "customer",
- "customer_address", "customer_demographics", "date_dim",
"time_dim", "household_demographics",
- "income_band", "inventory", "item", "promotion", "reason",
"ship_mode", "store", "store_returns",
- "store_sales", "warehouse", "web_page", "web_returns", "web_sales",
"web_site"]
+ table_names = [
+ "call_center", "catalog_page", "catalog_returns", "catalog_sales",
+ "customer", "customer_address", "customer_demographics",
"date_dim",
+ "time_dim", "household_demographics", "income_band", "inventory",
+ "item", "promotion", "reason", "ship_mode", "store",
"store_returns",
+ "store_sales", "warehouse", "web_page", "web_returns", "web_sales",
+ "web_site"
+ ]
else:
- raise "invalid benchmark"
+ raise ValueError(f"Invalid benchmark: {benchmark}")
+ # Register tables from either files or Iceberg catalog
+ using_iceberg = catalog is not None
for table in table_names:
- path = f"{data_path}/{table}.{format}"
- print(f"Registering table {table} using path {path}")
- df = spark.read.format(format).options(**options).load(path)
+ if using_iceberg:
+ source = f"{catalog}.{database}.{table}"
+ print(f"Registering table {table} from {source}")
+ df = spark.table(source)
+ else:
+ source = f"{data_path}/{table}.{format}"
+ print(f"Registering table {table} from {source}")
+ df = spark.read.format(format).options(**options).load(source)
df.createOrReplaceTempView(table)
conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()}
@@ -69,95 +104,154 @@ def main(benchmark: str, data_path: str, query_path: str,
iterations: int, outpu
results = {
'engine': 'datafusion-comet',
'benchmark': benchmark,
- 'data_path': data_path,
'query_path': query_path,
'spark_conf': conf_dict,
}
+ if using_iceberg:
+ results['catalog'] = catalog
+ results['database'] = database
+ else:
+ results['data_path'] = data_path
- for iteration in range(0, iterations):
- print(f"Starting iteration {iteration} of {iterations}")
+ for iteration in range(iterations):
+ print(f"\n{'='*60}")
+ print(f"Starting iteration {iteration + 1} of {iterations}")
+ print(f"{'='*60}")
iter_start_time = time.time()
# Determine which queries to run
if query_num is not None:
- # Validate query number
if query_num < 1 or query_num > num_queries:
- raise ValueError(f"Query number {query_num} is out of range.
Valid range is 1-{num_queries} for {benchmark}")
+ raise ValueError(
+ f"Query number {query_num} out of range. "
+ f"Valid: 1-{num_queries} for {benchmark}"
+ )
queries_to_run = [query_num]
else:
- queries_to_run = range(1, num_queries+1)
+ queries_to_run = range(1, num_queries + 1)
for query in queries_to_run:
spark.sparkContext.setJobDescription(f"{benchmark} q{query}")
- # read text file
path = f"{query_path}/q{query}.sql"
+ print(f"\nRunning query {query} from {path}")
- print(f"Reading query {query} using path {path}")
with open(path, "r") as f:
text = f.read()
- # each file can contain multiple queries
queries = text.split(";")
start_time = time.time()
for sql in queries:
sql = sql.strip().replace("create view", "create temp
view")
if len(sql) > 0:
- print(f"Executing: {sql}")
+ print(f"Executing: {sql[:100]}...")
df = spark.sql(sql)
df.explain("formatted")
if write_path is not None:
- # skip results with empty schema
- # coming across for running DDL stmt
if len(df.columns) > 0:
output_path = f"{write_path}/q{query}"
- # rename same column names for output
- # a, a, b, b => a, a_1, b, b_1
- # output doesn't allow non unique column names
deduped = dedup_columns(df)
- # sort by all columns to have predictable
output dataset for comparison
deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path)
- print(f"Query {query} results written to
{output_path}")
- else:
- print(f"Skipping write: DataFrame has no
schema for {output_path}")
+ print(f"Results written to {output_path}")
else:
rows = df.collect()
print(f"Query {query} returned {len(rows)} rows")
end_time = time.time()
- print(f"Query {query} took {end_time - start_time} seconds")
+ elapsed = end_time - start_time
+ print(f"Query {query} took {elapsed:.2f} seconds")
- # store timings in list and later add option to run > 1
iterations
query_timings = results.setdefault(query, [])
- query_timings.append(end_time - start_time)
+ query_timings.append(elapsed)
iter_end_time = time.time()
- print(f"Iteration {iteration} took {round(iter_end_time -
iter_start_time,2)} seconds")
+ print(f"\nIteration {iteration + 1} took {iter_end_time -
iter_start_time:.2f} seconds")
- str = json.dumps(results, indent=4)
+ # Write results
+ result_str = json.dumps(results, indent=4)
current_time_millis = int(datetime.now().timestamp() * 1000)
results_path = f"{output}/{name}-{benchmark}-{current_time_millis}.json"
- print(f"Writing results to {results_path}")
+ print(f"\nWriting results to {results_path}")
with open(results_path, "w") as f:
- f.write(str)
+ f.write(result_str)
- # Stop the SparkSession
spark.stop()
+
if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="DataFusion benchmark derived
from TPC-H / TPC-DS")
- parser.add_argument("--benchmark", required=True, default="tpch",
help="Benchmark to run (tpch or tpcds)")
- parser.add_argument("--data", required=True, help="Path to data files")
- parser.add_argument("--queries", required=True, help="Path to query files")
- parser.add_argument("--iterations", required=False, default="1", help="How
many iterations to run")
- parser.add_argument("--output", required=True, help="Path to write output")
- parser.add_argument("--name", required=True, help="Prefix for result file
e.g. spark/comet/gluten")
- parser.add_argument("--query", required=False, type=int, help="Specific
query number to run (1-based). If not specified, all queries will be run.")
- parser.add_argument("--write", required=False, help="Path to save query
results to, in Parquet format.")
- parser.add_argument("--format", required=True, default="parquet",
help="Input file format (parquet, csv, json)")
- parser.add_argument("--options", type=json.loads, required=False,
default={}, help='Spark options as JSON string, e.g., \'{"header": "true",
"delimiter": ","}\'')
- args = parser.parse_args()
+ parser = argparse.ArgumentParser(
+ description="TPC-H/TPC-DS benchmark runner for files or Iceberg tables"
+ )
+ parser.add_argument(
+ "--benchmark", required=True,
+ help="Benchmark to run (tpch or tpcds)"
+ )
+
+ # Data source - mutually exclusive: either file path or Iceberg catalog
+ source_group = parser.add_mutually_exclusive_group(required=True)
+ source_group.add_argument(
+ "--data",
+ help="Path to data files"
+ )
+ source_group.add_argument(
+ "--catalog",
+ help="Iceberg catalog name"
+ )
- main(args.benchmark, args.data, args.queries, int(args.iterations),
args.output, args.name, args.format, args.query, args.write, args.options)
+ # Options for file-based reading
+ parser.add_argument(
+ "--format", default="parquet",
+ help="Input file format: parquet, csv, json (only used with --data)"
+ )
+ parser.add_argument(
+ "--options", type=json.loads, default={},
+ help='Spark reader options as JSON string, e.g., \'{"header":
"true"}\' (only used with --data)'
+ )
+
+ # Options for Iceberg
+ parser.add_argument(
+ "--database", default="tpch",
+ help="Database containing TPC tables (only used with --catalog)"
+ )
+
+ parser.add_argument(
+ "--queries", required=True,
+ help="Path to query SQL files"
+ )
+ parser.add_argument(
+ "--iterations", type=int, default=1,
+ help="Number of iterations"
+ )
+ parser.add_argument(
+ "--output", required=True,
+ help="Path to write results JSON"
+ )
+ parser.add_argument(
+ "--name", required=True,
+ help="Prefix for result file"
+ )
+ parser.add_argument(
+ "--query", type=int,
+ help="Specific query number (1-based). If omitted, run all."
+ )
+ parser.add_argument(
+ "--write",
+ help="Path to save query results as Parquet"
+ )
+ args = parser.parse_args()
+ main(
+ args.benchmark,
+ args.data,
+ args.catalog,
+ args.database,
+ args.queries,
+ args.iterations,
+ args.output,
+ args.name,
+ args.format,
+ args.query,
+ args.write,
+ args.options
+ )
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]