This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new a2a9467e9 chore: Add Iceberg TPC-H benchmarking scripts (#3294)
a2a9467e9 is described below

commit a2a9467e93aab51d30e8f226a55b8e456190f045
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 9 08:53:35 2026 -0700

    chore: Add Iceberg TPC-H benchmarking scripts (#3294)
    
    * [WIP] Add Iceberg TPC-H benchmarking scripts
    
    Add scripts to benchmark TPC-H queries against Iceberg tables using
    Comet's native iceberg-rust integration:
    
    - create-iceberg-tpch.py: Convert Parquet TPC-H data to Iceberg tables
    - tpcbench-iceberg.py: Run TPC-H queries against Iceberg catalog tables
    - comet-tpch-iceberg.sh: Shell script to run the benchmark with Comet
    
    Also updates README.md with Iceberg benchmarking documentation.
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    * fix
    
    * fix
    
    * Consolidate Parquet and Iceberg benchmark scripts into single tpcbench.py
    
    Merge tpcbench-iceberg.py into tpcbench.py using mutually exclusive args:
    - --data for Parquet files
    - --catalog/--database for Iceberg tables
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    * fix: address review comments on README consistency
    
    - Use --packages instead of --jars for table creation to match
      create-iceberg-tpch.py usage
    - Use $ICEBERG_CATALOG variable instead of hardcoding 'local' in
      spark.sql.catalog config to be consistent with comet-tpch-iceberg.sh
    - Clarify that JAR download is only needed for benchmark execution
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 dev/benchmarks/README.md              |  76 +++++++++++++
 dev/benchmarks/comet-tpch-iceberg.sh  | 114 +++++++++++++++++++
 dev/benchmarks/create-iceberg-tpch.py |  88 +++++++++++++++
 dev/benchmarks/tpcbench.py            | 206 +++++++++++++++++++++++++---------
 4 files changed, 428 insertions(+), 56 deletions(-)

diff --git a/dev/benchmarks/README.md b/dev/benchmarks/README.md
index 2ef7a9a26..b3ea67419 100644
--- a/dev/benchmarks/README.md
+++ b/dev/benchmarks/README.md
@@ -73,3 +73,79 @@ Generating charts:
 ```shell
 python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 
0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local 
Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json 
gluten-tpch-1752337474344.json
 ```
+
+## Iceberg Benchmarking
+
+Comet includes native Iceberg support via iceberg-rust integration. This 
enables benchmarking TPC-H queries
+against Iceberg tables with native scan acceleration.
+
+### Prerequisites
+
+Download the Iceberg Spark runtime JAR (required for running the benchmark):
+
+```shell
+wget 
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
+export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
+```
+
+Note: Table creation uses `--packages` which auto-downloads the dependency.
+
+### Create Iceberg TPC-H tables
+
+Convert existing Parquet TPC-H data to Iceberg format:
+
+```shell
+export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
+export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
+
+$SPARK_HOME/bin/spark-submit \
+    --master $SPARK_MASTER \
+    --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+    --conf spark.driver.memory=8G \
+    --conf spark.executor.instances=1 \
+    --conf spark.executor.cores=8 \
+    --conf spark.cores.max=8 \
+    --conf spark.executor.memory=16g \
+    --conf 
spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
+    --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
+    --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
+    create-iceberg-tpch.py \
+    --parquet-path $TPCH_DATA \
+    --catalog $ICEBERG_CATALOG \
+    --database tpch
+```
+
+### Run Iceberg benchmark
+
+```shell
+export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
+export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar
+export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
+export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse
+export TPCH_QUERIES=/mnt/bigdata/tpch/queries/
+sudo ./drop-caches.sh
+./comet-tpch-iceberg.sh
+```
+
+The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable 
Comet's native iceberg-rust
+integration. Verify native scanning is active by checking for 
`CometIcebergNativeScanExec` in the
+physical plan output.
+
+### Iceberg-specific options
+
+| Environment Variable | Default    | Description                         |
+| -------------------- | ---------- | ----------------------------------- |
+| `ICEBERG_CATALOG`    | `local`    | Iceberg catalog name                |
+| `ICEBERG_DATABASE`   | `tpch`     | Database containing TPC-H tables    |
+| `ICEBERG_WAREHOUSE`  | (required) | Path to Iceberg warehouse directory |
+
+### Comparing Parquet vs Iceberg performance
+
+Run both benchmarks and compare:
+
+```shell
+python3 generate-comparison.py --benchmark tpch \
+    --labels "Comet (Parquet)" "Comet (Iceberg)" \
+    --title "TPC-H @ 100 GB: Parquet vs Iceberg" \
+    comet-tpch-*.json comet-iceberg-tpch-*.json
+```
diff --git a/dev/benchmarks/comet-tpch-iceberg.sh 
b/dev/benchmarks/comet-tpch-iceberg.sh
new file mode 100755
index 000000000..7907125c8
--- /dev/null
+++ b/dev/benchmarks/comet-tpch-iceberg.sh
@@ -0,0 +1,114 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust 
integration.
+#
+# Required environment variables:
+#   SPARK_HOME      - Path to Spark installation
+#   SPARK_MASTER    - Spark master URL (e.g., spark://localhost:7077)
+#   COMET_JAR       - Path to Comet JAR
+#   ICEBERG_JAR     - Path to Iceberg Spark runtime JAR
+#   ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory
+#   TPCH_QUERIES    - Path to TPC-H query files
+#
+# Optional:
+#   ICEBERG_CATALOG - Catalog name (default: local)
+#   ICEBERG_DATABASE - Database name (default: tpch)
+#
+# Setup (run once to create Iceberg tables from Parquet):
+#   $SPARK_HOME/bin/spark-submit \
+#       --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+#       --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
+#       --conf spark.sql.catalog.local.type=hadoop \
+#       --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \
+#       create-iceberg-tpch.py \
+#       --parquet-path $TPCH_DATA \
+#       --catalog local \
+#       --database tpch
+
+set -e
+
+# Defaults
+ICEBERG_CATALOG=${ICEBERG_CATALOG:-local}
+ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch}
+
+# Validate required variables
+if [ -z "$SPARK_HOME" ]; then
+    echo "Error: SPARK_HOME is not set"
+    exit 1
+fi
+if [ -z "$COMET_JAR" ]; then
+    echo "Error: COMET_JAR is not set"
+    exit 1
+fi
+if [ -z "$ICEBERG_JAR" ]; then
+    echo "Error: ICEBERG_JAR is not set"
+    echo "Download from: 
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/";
+    exit 1
+fi
+if [ -z "$ICEBERG_WAREHOUSE" ]; then
+    echo "Error: ICEBERG_WAREHOUSE is not set"
+    exit 1
+fi
+if [ -z "$TPCH_QUERIES" ]; then
+    echo "Error: TPCH_QUERIES is not set"
+    exit 1
+fi
+
+$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true
+$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true
+
+$SPARK_HOME/sbin/start-master.sh
+$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
+
+$SPARK_HOME/bin/spark-submit \
+    --master $SPARK_MASTER \
+    --jars $COMET_JAR,$ICEBERG_JAR \
+    --driver-class-path $COMET_JAR:$ICEBERG_JAR \
+    --conf spark.driver.memory=8G \
+    --conf spark.executor.instances=1 \
+    --conf spark.executor.cores=8 \
+    --conf spark.cores.max=8 \
+    --conf spark.executor.memory=16g \
+    --conf spark.memory.offHeap.enabled=true \
+    --conf spark.memory.offHeap.size=16g \
+    --conf spark.eventLog.enabled=true \
+    --conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
+    --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
+    --conf spark.plugins=org.apache.spark.CometPlugin \
+    --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
+    --conf spark.comet.exec.replaceSortMergeJoin=true \
+    --conf spark.comet.expression.Cast.allowIncompatible=true \
+    --conf spark.comet.enabled=true \
+    --conf spark.comet.exec.enabled=true \
+    --conf spark.comet.scan.icebergNative.enabled=true \
+    --conf spark.comet.explainFallback.enabled=true \
+    --conf 
spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \
+    --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \
+    --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \
+    --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \
+    tpcbench.py \
+    --name comet-iceberg \
+    --benchmark tpch \
+    --catalog $ICEBERG_CATALOG \
+    --database $ICEBERG_DATABASE \
+    --queries $TPCH_QUERIES \
+    --output . \
+    --iterations 1
diff --git a/dev/benchmarks/create-iceberg-tpch.py 
b/dev/benchmarks/create-iceberg-tpch.py
new file mode 100644
index 000000000..44f0f63a2
--- /dev/null
+++ b/dev/benchmarks/create-iceberg-tpch.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Convert TPC-H Parquet data to Iceberg tables.
+
+Usage:
+    spark-submit \
+        --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
+        --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
+        --conf spark.sql.catalog.local.type=hadoop \
+        --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \
+        create-iceberg-tpch.py \
+        --parquet-path /path/to/tpch/parquet \
+        --catalog local \
+        --database tpch
+"""
+
+import argparse
+from pyspark.sql import SparkSession
+import time
+
+
+def main(parquet_path: str, catalog: str, database: str):
+    spark = SparkSession.builder \
+        .appName("Create Iceberg TPC-H Tables") \
+        .getOrCreate()
+
+    table_names = [
+        "customer",
+        "lineitem",
+        "nation",
+        "orders",
+        "part",
+        "partsupp",
+        "region",
+        "supplier"
+    ]
+
+    # Create database if it doesn't exist
+    spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}")
+
+    for table in table_names:
+        parquet_table_path = f"{parquet_path}/{table}.parquet"
+        iceberg_table = f"{catalog}.{database}.{table}"
+
+        print(f"Converting {parquet_table_path} -> {iceberg_table}")
+        start_time = time.time()
+
+        # Drop table if exists to allow re-running
+        spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}")
+
+        # Read parquet and write as Iceberg
+        df = spark.read.parquet(parquet_table_path)
+        df.writeTo(iceberg_table).using("iceberg").create()
+
+        row_count = spark.table(iceberg_table).count()
+        elapsed = time.time() - start_time
+        print(f"  Created {iceberg_table} with {row_count} rows in 
{elapsed:.2f}s")
+
+    print("\nAll TPC-H tables created successfully!")
+    print(f"Tables available at: {catalog}.{database}.*")
+
+    spark.stop()
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data 
to Iceberg tables")
+    parser.add_argument("--parquet-path", required=True, help="Path to TPC-H 
Parquet data directory")
+    parser.add_argument("--catalog", required=True, help="Iceberg catalog name 
(e.g., 'local')")
+    parser.add_argument("--database", default="tpch", help="Database name to 
create tables in")
+    args = parser.parse_args()
+
+    main(args.parquet_path, args.catalog, args.database)
diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py
index 130db7a62..400ccd175 100644
--- a/dev/benchmarks/tpcbench.py
+++ b/dev/benchmarks/tpcbench.py
@@ -15,6 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
+"""
+TPC-H / TPC-DS benchmark runner.
+
+Supports two data sources:
+  - Files: use --data with --format (parquet, csv, json) and optional --options
+  - Iceberg tables: use --catalog and --database to specify the catalog 
location
+"""
+
 import argparse
 from datetime import datetime
 import json
@@ -22,11 +30,9 @@ from pyspark.sql import SparkSession
 import time
 from typing import Dict
 
-# rename same columns aliases
-# a, a, b, b -> a, a_1, b, b_1
-#
-# Important for writing data where column name uniqueness is required
+
 def dedup_columns(df):
+    """Rename duplicate column aliases: a, a, b, b -> a, a_1, b, b_1"""
     counts = {}
     new_cols = []
     for c in df.columns:
@@ -38,30 +44,59 @@ def dedup_columns(df):
             new_cols.append(f"{c}_{counts[c]}")
     return df.toDF(*new_cols)
 
-def main(benchmark: str, data_path: str, query_path: str, iterations: int, 
output: str, name: str, format: str, query_num: int = None, write_path: str = 
None, options: Dict[str, str] = None):
 
-    # Initialize a SparkSession
+def main(
+    benchmark: str,
+    data_path: str,
+    catalog: str,
+    database: str,
+    query_path: str,
+    iterations: int,
+    output: str,
+    name: str,
+    format: str,
+    query_num: int = None,
+    write_path: str = None,
+    options: Dict[str, str] = None
+):
+    if options is None:
+        options = {}
+
     spark = SparkSession.builder \
         .appName(f"{name} benchmark derived from {benchmark}") \
         .getOrCreate()
 
-    # Register the tables
+    # Define tables for each benchmark
     if benchmark == "tpch":
         num_queries = 22
-        table_names = ["customer", "lineitem", "nation", "orders", "part", 
"partsupp", "region", "supplier"]
+        table_names = [
+            "customer", "lineitem", "nation", "orders",
+            "part", "partsupp", "region", "supplier"
+        ]
     elif benchmark == "tpcds":
         num_queries = 99
-        table_names = ["call_center", "catalog_page", "catalog_returns", 
"catalog_sales", "customer",
-           "customer_address", "customer_demographics", "date_dim", 
"time_dim", "household_demographics",
-           "income_band", "inventory", "item", "promotion", "reason", 
"ship_mode", "store", "store_returns",
-           "store_sales", "warehouse", "web_page", "web_returns", "web_sales", 
"web_site"]
+        table_names = [
+            "call_center", "catalog_page", "catalog_returns", "catalog_sales",
+            "customer", "customer_address", "customer_demographics", 
"date_dim",
+            "time_dim", "household_demographics", "income_band", "inventory",
+            "item", "promotion", "reason", "ship_mode", "store", 
"store_returns",
+            "store_sales", "warehouse", "web_page", "web_returns", "web_sales",
+            "web_site"
+        ]
     else:
-        raise "invalid benchmark"
+        raise ValueError(f"Invalid benchmark: {benchmark}")
 
+    # Register tables from either files or Iceberg catalog
+    using_iceberg = catalog is not None
     for table in table_names:
-        path = f"{data_path}/{table}.{format}"
-        print(f"Registering table {table} using path {path}")
-        df = spark.read.format(format).options(**options).load(path)
+        if using_iceberg:
+            source = f"{catalog}.{database}.{table}"
+            print(f"Registering table {table} from {source}")
+            df = spark.table(source)
+        else:
+            source = f"{data_path}/{table}.{format}"
+            print(f"Registering table {table} from {source}")
+            df = spark.read.format(format).options(**options).load(source)
         df.createOrReplaceTempView(table)
 
     conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()}
@@ -69,95 +104,154 @@ def main(benchmark: str, data_path: str, query_path: str, 
iterations: int, outpu
     results = {
         'engine': 'datafusion-comet',
         'benchmark': benchmark,
-        'data_path': data_path,
         'query_path': query_path,
         'spark_conf': conf_dict,
     }
+    if using_iceberg:
+        results['catalog'] = catalog
+        results['database'] = database
+    else:
+        results['data_path'] = data_path
 
-    for iteration in range(0, iterations):
-        print(f"Starting iteration {iteration} of {iterations}")
+    for iteration in range(iterations):
+        print(f"\n{'='*60}")
+        print(f"Starting iteration {iteration + 1} of {iterations}")
+        print(f"{'='*60}")
         iter_start_time = time.time()
 
         # Determine which queries to run
         if query_num is not None:
-            # Validate query number
             if query_num < 1 or query_num > num_queries:
-                raise ValueError(f"Query number {query_num} is out of range. 
Valid range is 1-{num_queries} for {benchmark}")
+                raise ValueError(
+                    f"Query number {query_num} out of range. "
+                    f"Valid: 1-{num_queries} for {benchmark}"
+                )
             queries_to_run = [query_num]
         else:
-            queries_to_run = range(1, num_queries+1)
+            queries_to_run = range(1, num_queries + 1)
 
         for query in queries_to_run:
             spark.sparkContext.setJobDescription(f"{benchmark} q{query}")
 
-            # read text file
             path = f"{query_path}/q{query}.sql"
+            print(f"\nRunning query {query} from {path}")
 
-            print(f"Reading query {query} using path {path}")
             with open(path, "r") as f:
                 text = f.read()
-                # each file can contain multiple queries
                 queries = text.split(";")
 
                 start_time = time.time()
                 for sql in queries:
                     sql = sql.strip().replace("create view", "create temp 
view")
                     if len(sql) > 0:
-                        print(f"Executing: {sql}")
+                        print(f"Executing: {sql[:100]}...")
                         df = spark.sql(sql)
                         df.explain("formatted")
 
                         if write_path is not None:
-                            # skip results with empty schema
-                            # coming across for running DDL stmt
                             if len(df.columns) > 0:
                                 output_path = f"{write_path}/q{query}"
-                                # rename same column names for output
-                                # a, a, b, b => a, a_1, b, b_1
-                                # output doesn't allow non unique column names
                                 deduped = dedup_columns(df)
-                                # sort by all columns to have predictable 
output dataset for comparison
                                 
deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path)
-                                print(f"Query {query} results written to 
{output_path}")
-                            else:
-                                print(f"Skipping write: DataFrame has no 
schema for {output_path}")
+                                print(f"Results written to {output_path}")
                         else:
                             rows = df.collect()
                             print(f"Query {query} returned {len(rows)} rows")
 
                 end_time = time.time()
-                print(f"Query {query} took {end_time - start_time} seconds")
+                elapsed = end_time - start_time
+                print(f"Query {query} took {elapsed:.2f} seconds")
 
-                # store timings in list and later add option to run > 1 
iterations
                 query_timings = results.setdefault(query, [])
-                query_timings.append(end_time - start_time)
+                query_timings.append(elapsed)
 
         iter_end_time = time.time()
-        print(f"Iteration {iteration} took {round(iter_end_time - 
iter_start_time,2)} seconds")
+        print(f"\nIteration {iteration + 1} took {iter_end_time - 
iter_start_time:.2f} seconds")
 
-    str = json.dumps(results, indent=4)
+    # Write results
+    result_str = json.dumps(results, indent=4)
     current_time_millis = int(datetime.now().timestamp() * 1000)
     results_path = f"{output}/{name}-{benchmark}-{current_time_millis}.json"
-    print(f"Writing results to {results_path}")
+    print(f"\nWriting results to {results_path}")
     with open(results_path, "w") as f:
-        f.write(str)
+        f.write(result_str)
 
-    # Stop the SparkSession
     spark.stop()
 
+
 if __name__ == "__main__":
-    parser = argparse.ArgumentParser(description="DataFusion benchmark derived 
from TPC-H / TPC-DS")
-    parser.add_argument("--benchmark", required=True, default="tpch", 
help="Benchmark to run (tpch or tpcds)")
-    parser.add_argument("--data", required=True, help="Path to data files")
-    parser.add_argument("--queries", required=True, help="Path to query files")
-    parser.add_argument("--iterations", required=False, default="1", help="How 
many iterations to run")
-    parser.add_argument("--output", required=True, help="Path to write output")
-    parser.add_argument("--name", required=True, help="Prefix for result file 
e.g. spark/comet/gluten")
-    parser.add_argument("--query", required=False, type=int, help="Specific 
query number to run (1-based). If not specified, all queries will be run.")
-    parser.add_argument("--write", required=False, help="Path to save query 
results to, in Parquet format.")
-    parser.add_argument("--format", required=True, default="parquet", 
help="Input file format (parquet, csv, json)")
-    parser.add_argument("--options", type=json.loads, required=False, 
default={}, help='Spark options as JSON string, e.g., \'{"header": "true", 
"delimiter": ","}\'')
-    args = parser.parse_args()
+    parser = argparse.ArgumentParser(
+        description="TPC-H/TPC-DS benchmark runner for files or Iceberg tables"
+    )
+    parser.add_argument(
+        "--benchmark", required=True,
+        help="Benchmark to run (tpch or tpcds)"
+    )
+
+    # Data source - mutually exclusive: either file path or Iceberg catalog
+    source_group = parser.add_mutually_exclusive_group(required=True)
+    source_group.add_argument(
+        "--data",
+        help="Path to data files"
+    )
+    source_group.add_argument(
+        "--catalog",
+        help="Iceberg catalog name"
+    )
 
-    main(args.benchmark, args.data, args.queries, int(args.iterations), 
args.output, args.name, args.format, args.query, args.write, args.options)
+    # Options for file-based reading
+    parser.add_argument(
+        "--format", default="parquet",
+        help="Input file format: parquet, csv, json (only used with --data)"
+    )
+    parser.add_argument(
+        "--options", type=json.loads, default={},
+        help='Spark reader options as JSON string, e.g., \'{"header": 
"true"}\' (only used with --data)'
+    )
+
+    # Options for Iceberg
+    parser.add_argument(
+        "--database", default="tpch",
+        help="Database containing TPC tables (only used with --catalog)"
+    )
+
+    parser.add_argument(
+        "--queries", required=True,
+        help="Path to query SQL files"
+    )
+    parser.add_argument(
+        "--iterations", type=int, default=1,
+        help="Number of iterations"
+    )
+    parser.add_argument(
+        "--output", required=True,
+        help="Path to write results JSON"
+    )
+    parser.add_argument(
+        "--name", required=True,
+        help="Prefix for result file"
+    )
+    parser.add_argument(
+        "--query", type=int,
+        help="Specific query number (1-based). If omitted, run all."
+    )
+    parser.add_argument(
+        "--write",
+        help="Path to save query results as Parquet"
+    )
+    args = parser.parse_args()
 
+    main(
+        args.benchmark,
+        args.data,
+        args.catalog,
+        args.database,
+        args.queries,
+        args.iterations,
+        args.output,
+        args.name,
+        args.format,
+        args.query,
+        args.write,
+        args.options
+    )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to