This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-benchmarks.git
The following commit(s) were added to refs/heads/main by this push:
new 004e232 Update scripts to support TPC-DS (#12)
004e232 is described below
commit 004e2325181de2ec73b9dbdae09dd954999596de
Author: Andy Grove <[email protected]>
AuthorDate: Tue Aug 20 16:18:16 2024 -0600
Update scripts to support TPC-DS (#12)
* Update scripts to support TPC-DS
* output path
* fix typo
* Update runners/datafusion-comet/README.md
Co-authored-by: Liang-Chi Hsieh <[email protected]>
* remove duplicate line
* remove duplicate line
---------
Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
runners/datafusion-comet/README.md | 11 ++++--
runners/datafusion-comet/tpcbench.py | 15 +++++---
scripts/generate-comparison.py | 66 +++++++++++++++++++++++------------
tpcds/queries-spark/q72_optimized.sql | 32 +++++++++++++++++
4 files changed, 93 insertions(+), 31 deletions(-)
diff --git a/runners/datafusion-comet/README.md
b/runners/datafusion-comet/README.md
index 1c6be98..0d1733b 100644
--- a/runners/datafusion-comet/README.md
+++ b/runners/datafusion-comet/README.md
@@ -23,7 +23,7 @@ Follow the [Comet
Installation](https://datafusion.apache.org/comet/user-guide/i
create a Comet JAR file and then set the `COMET_JAR` environment variable to
point to that jar file.
```shell
-export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar
+export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.2.0-SNAPSHOT.jar
```
Set up `SPARK_HOME` to point to the relevant Spark version and use
`spark-submit` to run the benchmark script.
@@ -40,15 +40,20 @@ $SPARK_HOME/bin/spark-submit \
--jars $COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
+ --conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
- --conf spark.comet.explainFallback.enabled=true \
+ --conf spark.comet.cast.allowIncompatible=true \
+ --conf spark.comet.exec.shuffle.enabled=true \
+ --conf spark.comet.exec.shuffle.mode=auto \
+ --conf
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
\
tpcbench.py \
--benchmark tpch \
--data /path/to/parquet-data \
- --queries ../../tpch/queries/
+ --queries ../../tpch/queries/ \
+ --output .
```
When benchmarking Comet, we are generally interested in comparing the
performance of Spark with Comet disabled to
diff --git a/runners/datafusion-comet/tpcbench.py
b/runners/datafusion-comet/tpcbench.py
index 4e9b789..ed11938 100644
--- a/runners/datafusion-comet/tpcbench.py
+++ b/runners/datafusion-comet/tpcbench.py
@@ -21,7 +21,7 @@ import json
from pyspark.sql import SparkSession
import time
-def main(benchmark: str, data_path: str, query_path: str, iterations: int):
+def main(benchmark: str, data_path: str, query_path: str, iterations: int,
output: str):
# Initialize a SparkSession
spark = SparkSession.builder \
@@ -61,10 +61,14 @@ def main(benchmark: str, data_path: str, query_path: str,
iterations: int):
print(f"Starting iteration {iteration} of {iterations}")
for query in range(1, num_queries+1):
- spark.sparkContext.setJobDescription(f"TPC-H q{query}")
+ spark.sparkContext.setJobDescription(f"{benchmark} q{query}")
# read text file
- path = f"{query_path}/q{query}.sql"
+ if query == 72:
+ # use version with sensible join order
+ path = f"{query_path}/q{query}_optimized.sql"
+ else:
+ path = f"{query_path}/q{query}.sql"
print(f"Reading query {query} using path {path}")
with open(path, "r") as f:
text = f.read()
@@ -89,7 +93,7 @@ def main(benchmark: str, data_path: str, query_path: str,
iterations: int):
str = json.dumps(results, indent=4)
current_time_millis = int(datetime.now().timestamp() * 1000)
- results_path = f"spark-{benchmark}-{current_time_millis}.json"
+ results_path = f"{output}/spark-{benchmark}-{current_time_millis}.json"
print(f"Writing results to {results_path}")
with open(results_path, "w") as f:
f.write(str)
@@ -103,6 +107,7 @@ if __name__ == "__main__":
parser.add_argument("--data", required=True, help="Path to data files")
parser.add_argument("--queries", required=True, help="Path to query files")
parser.add_argument("--iterations", required=False, default="1", help="How
many iterations to run")
+ parser.add_argument("--output", required=True, help="Path to write output")
args = parser.parse_args()
- main(args.benchmark, args.data, args.queries, int(args.iterations))
\ No newline at end of file
+ main(args.benchmark, args.data, args.queries, int(args.iterations),
args.output)
diff --git a/scripts/generate-comparison.py b/scripts/generate-comparison.py
index b13904e..3f28af1 100644
--- a/scripts/generate-comparison.py
+++ b/scripts/generate-comparison.py
@@ -19,14 +19,13 @@ import argparse
import json
import matplotlib.pyplot as plt
import numpy as np
-import sys
def geomean(data):
return np.prod(data) ** (1 / len(data))
-def generate_query_speedup_chart(baseline, comparison, label1: str, label2:
str, benchmark: str):
+def generate_query_speedup_chart(baseline, comparison, label1: str, label2:
str, benchmark: str, title: str):
results = []
- for query in range(1, 23):
+ for query in range(1, query_count(benchmark)+1):
a = np.median(np.array(baseline[str(query)]))
b = np.median(np.array(comparison[str(query)]))
if a > b:
@@ -40,7 +39,10 @@ def generate_query_speedup_chart(baseline, comparison,
label1: str, label2: str,
queries, speedups = zip(*results)
# Create figure and axis
- fig, ax = plt.subplots(figsize=(10, 6))
+ if benchmark == "tpch":
+ fig, ax = plt.subplots(figsize=(10, 6))
+ else:
+ fig, ax = plt.subplots(figsize=(35, 10))
# Create bar chart
bars = ax.bar(queries, speedups, color='skyblue')
@@ -56,7 +58,7 @@ def generate_query_speedup_chart(baseline, comparison,
label1: str, label2: str,
color='blue', rotation=90)
# Add title and labels
- ax.set_title(label2 + " speedup over " + label1 + " (" + benchmark + ")")
+ ax.set_title(label2 + " speedup over " + label1 + " (" + title + ")")
ax.set_ylabel('Speedup (100% speedup = 2x faster)')
ax.set_xlabel('Query')
@@ -64,21 +66,25 @@ def generate_query_speedup_chart(baseline, comparison,
label1: str, label2: str,
ax.axhline(0, color='black', linewidth=0.8)
min_value = (min(speedups) // 100) * 100
max_value = ((max(speedups) // 100) + 1) * 100 + 50
- ax.set_ylim(min_value, max_value)
+ if benchmark == "tpch":
+ ax.set_ylim(min_value, max_value)
+ else:
+ # TODO improve this
+ ax.set_ylim(-250, 300)
# Show grid for better readability
ax.yaxis.grid(True)
# Save the plot as an image file
- plt.savefig('tpch_queries_speedup.png', format='png')
+ plt.savefig(f'{benchmark}_queries_speedup.png', format='png')
-def generate_query_comparison_chart(results, labels, benchmark: str):
+def generate_query_comparison_chart(results, labels, benchmark: str, title:
str):
queries = []
benches = []
for _ in results:
benches.append([])
- for query in range(1, 23):
+ for query in range(1, query_count(benchmark)+1):
queries.append("q" + str(query))
for i in range(0, len(results)):
benches[i].append(np.median(np.array(results[i][str(query)])))
@@ -90,12 +96,16 @@ def generate_query_comparison_chart(results, labels,
benchmark: str):
index = np.arange(len(queries)) * 1.5
# Create a bar chart
- fig, ax = plt.subplots(figsize=(15, 6))
+ if benchmark == "tpch":
+ fig, ax = plt.subplots(figsize=(15, 6))
+ else:
+ fig, ax = plt.subplots(figsize=(35, 6))
+
for i in range(0, len(results)):
bar = ax.bar(index + i * bar_width, benches[i], bar_width,
label=labels[i])
# Add labels, title, and legend
- ax.set_title(benchmark)
+ ax.set_title(title)
ax.set_xlabel('Queries')
ax.set_ylabel('Query Time (seconds)')
ax.set_xticks(index + bar_width / 2)
@@ -103,14 +113,15 @@ def generate_query_comparison_chart(results, labels,
benchmark: str):
ax.legend()
# Save the plot as an image file
- plt.savefig('tpch_queries_compare.png', format='png')
+ plt.savefig(f'{benchmark}_queries_compare.png', format='png')
-def generate_summary(results, labels, benchmark: str):
+def generate_summary(results, labels, benchmark: str, title: str):
timings = []
for _ in results:
timings.append(0)
- for query in range(1, 23):
+ num_queries = query_count(benchmark)
+ for query in range(1, num_queries + 1):
for i in range(0, len(results)):
timings[i] += np.median(np.array(results[i][str(query)]))
@@ -118,8 +129,8 @@ def generate_summary(results, labels, benchmark: str):
fig, ax = plt.subplots()
# Add title and labels
- ax.set_title(benchmark)
- ax.set_ylabel('Time in seconds to run all 22 TPC-H queries (lower is
better)')
+ ax.set_title(title)
+ ax.set_ylabel(f'Time in seconds to run all {num_queries} {benchmark}
queries (lower is better)')
times = [round(x,0) for x in timings]
@@ -131,22 +142,31 @@ def generate_summary(results, labels, benchmark: str):
yval = bar.get_height()
ax.text(bar.get_x() + bar.get_width() / 2.0, yval, f'{yval}',
va='bottom') # va: vertical alignment
- plt.savefig('tpch_allqueries.png', format='png')
+ plt.savefig(f'{benchmark}_allqueries.png', format='png')
+
+def query_count(benchmark: str):
+ if benchmark == "tpch":
+ return 22
+ elif benchmark == "tpcds":
+ return 99
+ else:
+ raise "invalid benchmark name"
-def main(files, labels, benchmark: str):
+def main(files, labels, benchmark: str, title: str):
results = []
for filename in files:
with open(filename) as f:
results.append(json.load(f))
- generate_summary(results, labels, benchmark)
- generate_query_comparison_chart(results, labels, benchmark)
+ generate_summary(results, labels, benchmark, title)
+ generate_query_comparison_chart(results, labels, benchmark, title)
if len(files) == 2:
- generate_query_speedup_chart(results[0], results[1], labels[0],
labels[1], benchmark)
+ generate_query_speedup_chart(results[0], results[1], labels[0],
labels[1], benchmark, title)
if __name__ == '__main__':
argparse = argparse.ArgumentParser(description='Generate comparison')
argparse.add_argument('filenames', nargs='+', type=str, help='JSON result
files')
argparse.add_argument('--labels', nargs='+', type=str, help='Labels')
- argparse.add_argument('--benchmark', type=str, help='Benchmark
description')
+ argparse.add_argument('--benchmark', type=str, help='Benchmark name (tpch
or tpcds)')
+ argparse.add_argument('--title', type=str, help='Chart title')
args = argparse.parse_args()
- main(args.filenames, args.labels, args.benchmark)
+ main(args.filenames, args.labels, args.benchmark, args.title)
diff --git a/tpcds/queries-spark/q72_optimized.sql
b/tpcds/queries-spark/q72_optimized.sql
new file mode 100644
index 0000000..a98a70e
--- /dev/null
+++ b/tpcds/queries-spark/q72_optimized.sql
@@ -0,0 +1,32 @@
+-- SQLBench-DS query 72 derived from TPC-DS query 72 under the terms of the
TPC Fair Use Policy.
+-- TPC-DS queries are Copyright 2021 Transaction Processing Performance
Council.
+
+-- This is a modified version of q72 that changes the join order to be
sensible (the original q72
+-- intentionally has a terrible join order for testing database vendors join
reordering rules)
+
+select i_item_desc
+ ,w_warehouse_name
+ ,d1.d_week_seq
+ ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
+ ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
+ ,count(*) total_cnt
+from catalog_sales
+ join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
+ join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
+ join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
+ join item on (i_item_sk = cs_item_sk)
+ join inventory on (cs_item_sk = inv_item_sk)
+ join warehouse on (w_warehouse_sk=inv_warehouse_sk)
+ join date_dim d2 on (inv_date_sk = d2.d_date_sk)
+ join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
+ left outer join promotion on (cs_promo_sk=p_promo_sk)
+ left outer join catalog_returns on (cr_item_sk = cs_item_sk and
cr_order_number = cs_order_number)
+where d1.d_week_seq = d2.d_week_seq
+ and inv_quantity_on_hand < cs_quantity
+ and d3.d_date > d1.d_date + 5
+ and hd_buy_potential = '501-1000'
+ and d1.d_year = 1999
+ and cd_marital_status = 'S'
+group by i_item_desc,w_warehouse_name,d1.d_week_seq
+order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
+LIMIT 100;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]