This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5f39006d5 feat: Add option to write benchmark results to file (#2640)
5f39006d5 is described below
commit 5f39006d5ef2d3b5e84fa68cf194baf14a383435
Author: Andy Grove <[email protected]>
AuthorDate: Thu Oct 23 13:20:43 2025 -0600
feat: Add option to write benchmark results to file (#2640)
* Add option to write benchmark results to file
* add option to write query results to Parquet
* write single file
* address feedback
---
dev/benchmarks/comet-tpch.sh | 1 +
dev/benchmarks/tpcbench.py | 15 +++++++++++----
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh
index e1f6f969f..df95565fe 100755
--- a/dev/benchmarks/comet-tpch.sh
+++ b/dev/benchmarks/comet-tpch.sh
@@ -50,4 +50,5 @@ $SPARK_HOME/bin/spark-submit \
--data $TPCH_DATA \
--queries $TPCH_QUERIES \
--output . \
+ --write /tmp \
--iterations 1
diff --git a/dev/benchmarks/tpcbench.py b/dev/benchmarks/tpcbench.py
index 031c19d8c..39c34ca7c 100644
--- a/dev/benchmarks/tpcbench.py
+++ b/dev/benchmarks/tpcbench.py
@@ -21,7 +21,7 @@ import json
from pyspark.sql import SparkSession
import time
-def main(benchmark: str, data_path: str, query_path: str, iterations: int,
output: str, name: str, query_num: int = None):
+def main(benchmark: str, data_path: str, query_path: str, iterations: int,
output: str, name: str, query_num: int = None, write_path: str = None):
# Initialize a SparkSession
spark = SparkSession.builder \
@@ -89,10 +89,16 @@ def main(benchmark: str, data_path: str, query_path: str,
iterations: int, outpu
print(f"Executing: {sql}")
df = spark.sql(sql)
df.explain()
- rows = df.collect()
+
+ if write_path is not None:
+ output_path = f"{write_path}/q{query}"
+
df.coalesce(1).write.mode("overwrite").parquet(output_path)
+ print(f"Query {query} results written to
{output_path}")
+ else:
+ rows = df.collect()
+ print(f"Query {query} returned {len(rows)} rows")
df.explain()
- print(f"Query {query} returned {len(rows)} rows")
end_time = time.time()
print(f"Query {query} took {end_time - start_time} seconds")
@@ -123,6 +129,7 @@ if __name__ == "__main__":
parser.add_argument("--output", required=True, help="Path to write output")
parser.add_argument("--name", required=True, help="Prefix for result file
e.g. spark/comet/gluten")
parser.add_argument("--query", required=False, type=int, help="Specific
query number to run (1-based). If not specified, all queries will be run.")
+ parser.add_argument("--write", required=False, help="Path to save query
results to, in Parquet format.")
args = parser.parse_args()
- main(args.benchmark, args.data, args.queries, int(args.iterations),
args.output, args.name, args.query)
\ No newline at end of file
+ main(args.benchmark, args.data, args.queries, int(args.iterations),
args.output, args.name, args.query, args.write)
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]