This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-benchmarks.git


The following commit(s) were added to refs/heads/main by this push:
     new 004e232  Update scripts to support TPC-DS (#12)
004e232 is described below

commit 004e2325181de2ec73b9dbdae09dd954999596de
Author: Andy Grove <[email protected]>
AuthorDate: Tue Aug 20 16:18:16 2024 -0600

    Update scripts to support TPC-DS (#12)
    
    * Update scripts to support TPC-DS
    
    * output path
    
    * fix typo
    
    * Update runners/datafusion-comet/README.md
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    * remove duplicate line
    
    * remove duplicate line
    
    ---------
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
---
 runners/datafusion-comet/README.md    | 11 ++++--
 runners/datafusion-comet/tpcbench.py  | 15 +++++---
 scripts/generate-comparison.py        | 66 +++++++++++++++++++++++------------
 tpcds/queries-spark/q72_optimized.sql | 32 +++++++++++++++++
 4 files changed, 93 insertions(+), 31 deletions(-)

diff --git a/runners/datafusion-comet/README.md 
b/runners/datafusion-comet/README.md
index 1c6be98..0d1733b 100644
--- a/runners/datafusion-comet/README.md
+++ b/runners/datafusion-comet/README.md
@@ -23,7 +23,7 @@ Follow the [Comet 
Installation](https://datafusion.apache.org/comet/user-guide/i
 create a Comet JAR file and then set the `COMET_JAR` environment variable to 
point to that jar file.
 
 ```shell
-export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar
+export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.2.0-SNAPSHOT.jar
 ```
 
 Set up `SPARK_HOME` to point to the relevant Spark version and use 
`spark-submit` to run the benchmark script.
@@ -40,15 +40,20 @@ $SPARK_HOME/bin/spark-submit \
     --jars $COMET_JAR \
     --conf spark.driver.extraClassPath=$COMET_JAR \
     --conf spark.executor.extraClassPath=$COMET_JAR \
+    --conf spark.plugins=org.apache.spark.CometPlugin \
     --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
     --conf spark.comet.enabled=true \
     --conf spark.comet.exec.enabled=true \
     --conf spark.comet.exec.all.enabled=true \
-    --conf spark.comet.explainFallback.enabled=true \
+    --conf spark.comet.cast.allowIncompatible=true \
+    --conf spark.comet.exec.shuffle.enabled=true \
+    --conf spark.comet.exec.shuffle.mode=auto \
+    --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
     tpcbench.py \
     --benchmark tpch \
     --data /path/to/parquet-data \
-    --queries ../../tpch/queries/
+    --queries ../../tpch/queries/ \
+    --output .
 ```
 
 When benchmarking Comet, we are generally interested in comparing the 
performance of Spark with Comet disabled to
diff --git a/runners/datafusion-comet/tpcbench.py 
b/runners/datafusion-comet/tpcbench.py
index 4e9b789..ed11938 100644
--- a/runners/datafusion-comet/tpcbench.py
+++ b/runners/datafusion-comet/tpcbench.py
@@ -21,7 +21,7 @@ import json
 from pyspark.sql import SparkSession
 import time
 
-def main(benchmark: str, data_path: str, query_path: str, iterations: int):
+def main(benchmark: str, data_path: str, query_path: str, iterations: int, 
output: str):
 
     # Initialize a SparkSession
     spark = SparkSession.builder \
@@ -61,10 +61,14 @@ def main(benchmark: str, data_path: str, query_path: str, 
iterations: int):
         print(f"Starting iteration {iteration} of {iterations}")
 
         for query in range(1, num_queries+1):
-            spark.sparkContext.setJobDescription(f"TPC-H q{query}")
+            spark.sparkContext.setJobDescription(f"{benchmark} q{query}")
 
             # read text file
-            path = f"{query_path}/q{query}.sql"
+            if query == 72:
+                # use version with sensible join order
+                path = f"{query_path}/q{query}_optimized.sql"
+            else:
+                path = f"{query_path}/q{query}.sql"
             print(f"Reading query {query} using path {path}")
             with open(path, "r") as f:
                 text = f.read()
@@ -89,7 +93,7 @@ def main(benchmark: str, data_path: str, query_path: str, 
iterations: int):
 
     str = json.dumps(results, indent=4)
     current_time_millis = int(datetime.now().timestamp() * 1000)
-    results_path = f"spark-{benchmark}-{current_time_millis}.json"
+    results_path = f"{output}/spark-{benchmark}-{current_time_millis}.json"
     print(f"Writing results to {results_path}")
     with open(results_path, "w") as f:
         f.write(str)
@@ -103,6 +107,7 @@ if __name__ == "__main__":
     parser.add_argument("--data", required=True, help="Path to data files")
     parser.add_argument("--queries", required=True, help="Path to query files")
     parser.add_argument("--iterations", required=False, default="1", help="How 
many iterations to run")
+    parser.add_argument("--output", required=True, help="Path to write output")
     args = parser.parse_args()
 
-    main(args.benchmark, args.data, args.queries, int(args.iterations))
\ No newline at end of file
+    main(args.benchmark, args.data, args.queries, int(args.iterations), 
args.output)
diff --git a/scripts/generate-comparison.py b/scripts/generate-comparison.py
index b13904e..3f28af1 100644
--- a/scripts/generate-comparison.py
+++ b/scripts/generate-comparison.py
@@ -19,14 +19,13 @@ import argparse
 import json
 import matplotlib.pyplot as plt
 import numpy as np
-import sys
 
 def geomean(data):
     return np.prod(data) ** (1 / len(data))
 
-def generate_query_speedup_chart(baseline, comparison, label1: str, label2: 
str, benchmark: str):
+def generate_query_speedup_chart(baseline, comparison, label1: str, label2: 
str, benchmark: str, title: str):
     results = []
-    for query in range(1, 23):
+    for query in range(1, query_count(benchmark)+1):
         a = np.median(np.array(baseline[str(query)]))
         b = np.median(np.array(comparison[str(query)]))
         if a > b:
@@ -40,7 +39,10 @@ def generate_query_speedup_chart(baseline, comparison, 
label1: str, label2: str,
     queries, speedups = zip(*results)
 
     # Create figure and axis
-    fig, ax = plt.subplots(figsize=(10, 6))
+    if benchmark == "tpch":
+        fig, ax = plt.subplots(figsize=(10, 6))
+    else:
+        fig, ax = plt.subplots(figsize=(35, 10))
 
     # Create bar chart
     bars = ax.bar(queries, speedups, color='skyblue')
@@ -56,7 +58,7 @@ def generate_query_speedup_chart(baseline, comparison, 
label1: str, label2: str,
                     color='blue', rotation=90)
 
     # Add title and labels
-    ax.set_title(label2 + " speedup over " + label1 + " (" + benchmark + ")")
+    ax.set_title(label2 + " speedup over " + label1 + " (" + title + ")")
     ax.set_ylabel('Speedup (100% speedup = 2x faster)')
     ax.set_xlabel('Query')
 
@@ -64,21 +66,25 @@ def generate_query_speedup_chart(baseline, comparison, 
label1: str, label2: str,
     ax.axhline(0, color='black', linewidth=0.8)
     min_value = (min(speedups) // 100) * 100
     max_value = ((max(speedups) // 100) + 1) * 100 + 50
-    ax.set_ylim(min_value, max_value)
+    if benchmark == "tpch":
+        ax.set_ylim(min_value, max_value)
+    else:
+        # TODO improve this
+        ax.set_ylim(-250, 300)
 
     # Show grid for better readability
     ax.yaxis.grid(True)
 
     # Save the plot as an image file
-    plt.savefig('tpch_queries_speedup.png', format='png')
+    plt.savefig(f'{benchmark}_queries_speedup.png', format='png')
 
 
-def generate_query_comparison_chart(results, labels, benchmark: str):
+def generate_query_comparison_chart(results, labels, benchmark: str, title: 
str):
     queries = []
     benches = []
     for _ in results:
         benches.append([])
-    for query in range(1, 23):
+    for query in range(1, query_count(benchmark)+1):
         queries.append("q" + str(query))
         for i in range(0, len(results)):
             benches[i].append(np.median(np.array(results[i][str(query)])))
@@ -90,12 +96,16 @@ def generate_query_comparison_chart(results, labels, 
benchmark: str):
     index = np.arange(len(queries)) * 1.5
 
     # Create a bar chart
-    fig, ax = plt.subplots(figsize=(15, 6))
+    if benchmark == "tpch":
+        fig, ax = plt.subplots(figsize=(15, 6))
+    else:
+        fig, ax = plt.subplots(figsize=(35, 6))
+
     for i in range(0, len(results)):
         bar = ax.bar(index + i * bar_width, benches[i], bar_width, 
label=labels[i])
 
     # Add labels, title, and legend
-    ax.set_title(benchmark)
+    ax.set_title(title)
     ax.set_xlabel('Queries')
     ax.set_ylabel('Query Time (seconds)')
     ax.set_xticks(index + bar_width / 2)
@@ -103,14 +113,15 @@ def generate_query_comparison_chart(results, labels, 
benchmark: str):
     ax.legend()
 
     # Save the plot as an image file
-    plt.savefig('tpch_queries_compare.png', format='png')
+    plt.savefig(f'{benchmark}_queries_compare.png', format='png')
 
-def generate_summary(results, labels, benchmark: str):
+def generate_summary(results, labels, benchmark: str, title: str):
     timings = []
     for _ in results:
         timings.append(0)
 
-    for query in range(1, 23):
+    num_queries = query_count(benchmark)
+    for query in range(1, num_queries + 1):
         for i in range(0, len(results)):
             timings[i] += np.median(np.array(results[i][str(query)]))
 
@@ -118,8 +129,8 @@ def generate_summary(results, labels, benchmark: str):
     fig, ax = plt.subplots()
 
     # Add title and labels
-    ax.set_title(benchmark)
-    ax.set_ylabel('Time in seconds to run all 22 TPC-H queries (lower is 
better)')
+    ax.set_title(title)
+    ax.set_ylabel(f'Time in seconds to run all {num_queries} {benchmark} 
queries (lower is better)')
 
     times = [round(x,0) for x in timings]
 
@@ -131,22 +142,31 @@ def generate_summary(results, labels, benchmark: str):
         yval = bar.get_height()
         ax.text(bar.get_x() + bar.get_width() / 2.0, yval, f'{yval}', 
va='bottom')  # va: vertical alignment
 
-    plt.savefig('tpch_allqueries.png', format='png')
+    plt.savefig(f'{benchmark}_allqueries.png', format='png')
+
+def query_count(benchmark: str):
+    if benchmark == "tpch":
+        return 22
+    elif benchmark == "tpcds":
+        return 99
+    else:
+        raise "invalid benchmark name"
 
-def main(files, labels, benchmark: str):
+def main(files, labels, benchmark: str, title: str):
     results = []
     for filename in files:
         with open(filename) as f:
             results.append(json.load(f))
-    generate_summary(results, labels, benchmark)
-    generate_query_comparison_chart(results, labels, benchmark)
+    generate_summary(results, labels, benchmark, title)
+    generate_query_comparison_chart(results, labels, benchmark, title)
     if len(files) == 2:
-        generate_query_speedup_chart(results[0], results[1], labels[0], 
labels[1], benchmark)
+        generate_query_speedup_chart(results[0], results[1], labels[0], 
labels[1], benchmark, title)
 
 if __name__ == '__main__':
     argparse = argparse.ArgumentParser(description='Generate comparison')
     argparse.add_argument('filenames', nargs='+', type=str, help='JSON result 
files')
     argparse.add_argument('--labels', nargs='+', type=str, help='Labels')
-    argparse.add_argument('--benchmark', type=str, help='Benchmark 
description')
+    argparse.add_argument('--benchmark', type=str, help='Benchmark name (tpch 
or tpcds)')
+    argparse.add_argument('--title', type=str, help='Chart title')
     args = argparse.parse_args()
-    main(args.filenames, args.labels, args.benchmark)
+    main(args.filenames, args.labels, args.benchmark, args.title)
diff --git a/tpcds/queries-spark/q72_optimized.sql 
b/tpcds/queries-spark/q72_optimized.sql
new file mode 100644
index 0000000..a98a70e
--- /dev/null
+++ b/tpcds/queries-spark/q72_optimized.sql
@@ -0,0 +1,32 @@
+-- SQLBench-DS query 72 derived from TPC-DS query 72 under the terms of the 
TPC Fair Use Policy.
+-- TPC-DS queries are Copyright 2021 Transaction Processing Performance 
Council.
+
+-- This is a modified version of q72 that changes the join order to be 
sensible (the original q72
+-- intentionally has a terrible join order for testing database vendors join 
reordering rules)
+
+select  i_item_desc
+     ,w_warehouse_name
+     ,d1.d_week_seq
+     ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
+     ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
+     ,count(*) total_cnt
+from catalog_sales
+         join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
+         join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
+         join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
+         join item on (i_item_sk = cs_item_sk)
+         join inventory on (cs_item_sk = inv_item_sk)
+         join warehouse on (w_warehouse_sk=inv_warehouse_sk)
+         join date_dim d2 on (inv_date_sk = d2.d_date_sk)
+         join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
+         left outer join promotion on (cs_promo_sk=p_promo_sk)
+         left outer join catalog_returns on (cr_item_sk = cs_item_sk and 
cr_order_number = cs_order_number)
+where d1.d_week_seq = d2.d_week_seq
+  and inv_quantity_on_hand < cs_quantity
+  and d3.d_date > d1.d_date + 5
+  and hd_buy_potential = '501-1000'
+  and d1.d_year = 1999
+  and cd_marital_status = 'S'
+group by i_item_desc,w_warehouse_name,d1.d_week_seq
+order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
+LIMIT 100;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to