This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new cf8cba0bc chore: Add PySpark-based benchmarks, starting with ETL 
example (#3065)
cf8cba0bc is described below

commit cf8cba0bc5d6bff42a0fb1ff1b0699b3b1f617ab
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jan 12 13:04:39 2026 -0700

    chore: Add PySpark-based benchmarks, starting with ETL example (#3065)
---
 benchmarks/pyspark/README.md             |  97 +++++++
 benchmarks/pyspark/generate_data.py      | 446 +++++++++++++++++++++++++++++++
 benchmarks/pyspark/run_all_benchmarks.sh | 118 ++++++++
 benchmarks/pyspark/run_benchmark.py      | 104 +++++++
 4 files changed, 765 insertions(+)

diff --git a/benchmarks/pyspark/README.md b/benchmarks/pyspark/README.md
new file mode 100644
index 000000000..130870081
--- /dev/null
+++ b/benchmarks/pyspark/README.md
@@ -0,0 +1,97 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Shuffle Size Comparison Benchmark
+
+Compares shuffle file sizes between Spark, Comet JVM, and Comet Native shuffle 
implementations.
+
+## Prerequisites
+
+- Apache Spark cluster (standalone, YARN, or Kubernetes)
+- PySpark installed
+- Comet JAR built
+
+## Build Comet JAR
+
+```bash
+cd /path/to/datafusion-comet
+make release
+```
+
+## Step 1: Generate Test Data
+
+Generate test data with realistic 50-column schema (nested structs, arrays, 
maps):
+
+```bash
+spark-submit \
+  --master spark://master:7077 \
+  --executor-memory 16g \
+  generate_data.py \
+  --output /tmp/shuffle-benchmark-data \
+  --rows 10000000 \
+  --partitions 200
+```
+
+### Data Generation Options
+
+| Option               | Default    | Description                  |
+| -------------------- | ---------- | ---------------------------- |
+| `--output`, `-o`     | (required) | Output path for parquet data |
+| `--rows`, `-r`       | 10000000   | Number of rows               |
+| `--partitions`, `-p` | 200        | Number of output partitions  |
+
+## Step 2: Run Benchmark
+
+Run benchmarks and check Spark UI for shuffle sizes:
+
+```bash
+SPARK_MASTER=spark://master:7077 \
+EXECUTOR_MEMORY=16g \
+./run_all_benchmarks.sh /tmp/shuffle-benchmark-data
+```
+
+Or run individual modes:
+
+```bash
+# Spark baseline
+spark-submit --master spark://master:7077 \
+  run_benchmark.py --data /tmp/shuffle-benchmark-data --mode spark
+
+# Comet JVM shuffle
+spark-submit --master spark://master:7077 \
+  --jars /path/to/comet.jar \
+  --conf spark.comet.enabled=true \
+  --conf spark.comet.exec.shuffle.enabled=true \
+  --conf spark.comet.shuffle.mode=jvm \
+  --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
+  run_benchmark.py --data /tmp/shuffle-benchmark-data --mode jvm
+
+# Comet Native shuffle
+spark-submit --master spark://master:7077 \
+  --jars /path/to/comet.jar \
+  --conf spark.comet.enabled=true \
+  --conf spark.comet.exec.shuffle.enabled=true \
+  --conf spark.comet.shuffle.mode=native \
+  --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
+  run_benchmark.py --data /tmp/shuffle-benchmark-data --mode native
+```
+
+## Checking Results
+
+Open the Spark UI (default: http://localhost:4040) during each benchmark run 
to compare shuffle write sizes in the Stages tab.
diff --git a/benchmarks/pyspark/generate_data.py 
b/benchmarks/pyspark/generate_data.py
new file mode 100755
index 000000000..d8be47d6e
--- /dev/null
+++ b/benchmarks/pyspark/generate_data.py
@@ -0,0 +1,446 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Generate test data for shuffle size comparison benchmark.
+
+This script generates a parquet dataset with a realistic schema (100 columns
+including deeply nested structs, arrays, and maps) for benchmarking shuffle
+operations across Spark, Comet JVM, and Comet Native shuffle modes.
+"""
+
+import argparse
+from pyspark.sql import SparkSession
+from pyspark.sql import functions as F
+from pyspark.sql.types import (
+    StructType, StructField, IntegerType, LongType, DoubleType,
+    StringType, BooleanType, DateType, TimestampType, ArrayType,
+    MapType, DecimalType
+)
+
+
+def generate_data(output_path: str, num_rows: int, num_partitions: int):
+    """Generate test data with realistic schema and write to parquet."""
+
+    spark = SparkSession.builder \
+        .appName("ShuffleBenchmark-DataGen") \
+        .getOrCreate()
+
+    print(f"Generating {num_rows:,} rows with {num_partitions} partitions")
+    print(f"Output path: {output_path}")
+    print("Schema: 100 columns including deeply nested structs, arrays, and 
maps")
+
+    # Start with a range and build up the columns
+    df = spark.range(0, num_rows, numPartitions=num_partitions)
+
+    # Add columns using selectExpr for better performance
+    df = df.selectExpr(
+        # Key columns for grouping/partitioning (1-3)
+        "cast(id % 1000 as int) as partition_key",
+        "cast(id % 100 as int) as group_key",
+        "id as row_id",
+
+        # Integer columns (4-15)
+        "cast(id % 10000 as int) as category_id",
+        "cast(id % 500 as int) as region_id",
+        "cast(id % 50 as int) as department_id",
+        "cast((id * 7) % 1000000 as int) as customer_id",
+        "cast((id * 13) % 100000 as int) as product_id",
+        "cast(id % 12 + 1 as int) as month",
+        "cast(id % 28 + 1 as int) as day",
+        "cast(2020 + (id % 5) as int) as year",
+        "cast((id * 17) % 256 as int) as priority",
+        "cast((id * 19) % 1000 as int) as rank",
+        "cast((id * 23) % 10000 as int) as score_int",
+        "cast((id * 29) % 500 as int) as level",
+
+        # Long columns (16-22)
+        "id * 1000 as transaction_id",
+        "(id * 17) % 10000000000 as account_number",
+        "(id * 31) % 1000000000 as reference_id",
+        "(id * 37) % 10000000000 as external_id",
+        "(id * 41) % 1000000000 as correlation_id",
+        "(id * 43) % 10000000000 as trace_id",
+        "(id * 47) % 1000000000 as span_id",
+
+        # Double columns (23-35)
+        "cast(id % 10000 as double) / 100.0 as amount",
+        "cast((id * 3) % 10000 as double) / 100.0 as price",
+        "cast(id % 100 as double) / 100.0 as discount",
+        "cast((id * 7) % 500 as double) / 10.0 as weight",
+        "cast((id * 11) % 1000 as double) / 10.0 as height",
+        "cast(id % 360 as double) as latitude",
+        "cast((id * 2) % 360 as double) as longitude",
+        "cast((id * 13) % 10000 as double) / 1000.0 as rate",
+        "cast((id * 17) % 100 as double) / 100.0 as percentage",
+        "cast((id * 19) % 1000 as double) as velocity",
+        "cast((id * 23) % 500 as double) / 10.0 as acceleration",
+        "cast((id * 29) % 10000 as double) / 100.0 as temperature",
+        "cast((id * 31) % 1000 as double) / 10.0 as pressure",
+
+        # String columns (36-50)
+        "concat('user_', cast(id % 100000 as string)) as user_name",
+        "concat('email_', cast(id % 50000 as string), '@example.com') as 
email",
+        "concat('SKU-', lpad(cast(id % 10000 as string), 6, '0')) as sku",
+        "concat('ORD-', cast(id as string)) as order_id",
+        "array('pending', 'processing', 'shipped', 'delivered', 
'cancelled')[cast(id % 5 as int)] as status",
+        "array('USD', 'EUR', 'GBP', 'JPY', 'CAD')[cast(id % 5 as int)] as 
currency",
+        "concat('Description for item ', cast(id % 1000 as string), ' with 
additional details') as description",
+        "concat('REF-', lpad(cast(id % 100000 as string), 8, '0')) as 
reference_code",
+        "concat('TXN-', cast(id as string), '-', cast(id % 1000 as string)) as 
transaction_code",
+        "array('A', 'B', 'C', 'D', 'E')[cast(id % 5 as int)] as grade",
+        "concat('Note: Record ', cast(id as string), ' processed 
successfully') as notes",
+        "concat('Session-', lpad(cast(id % 10000 as string), 6, '0')) as 
session_id",
+        "concat('Device-', cast(id % 1000 as string)) as device_id",
+        "array('chrome', 'firefox', 'safari', 'edge')[cast(id % 4 as int)] as 
browser",
+        "array('windows', 'macos', 'linux', 'ios', 'android')[cast(id % 5 as 
int)] as os",
+
+        # Boolean columns (51-56)
+        "id % 2 = 0 as is_active",
+        "id % 3 = 0 as is_verified",
+        "id % 7 = 0 as is_premium",
+        "id % 5 = 0 as is_deleted",
+        "id % 11 = 0 as is_featured",
+        "id % 13 = 0 as is_archived",
+
+        # Date and timestamp columns (57-60)
+        "date_add(to_date('2020-01-01'), cast(id % 1500 as int)) as 
created_date",
+        "date_add(to_date('2020-01-01'), cast((id + 30) % 1500 as int)) as 
updated_date",
+        "date_add(to_date('2020-01-01'), cast((id + 60) % 1500 as int)) as 
expires_date",
+        "to_timestamp(concat('2020-01-01 ', lpad(cast(id % 24 as string), 2, 
'0'), ':00:00')) as created_at",
+
+        # Simple arrays (61-65)
+        "array(cast(id % 100 as int), cast((id + 1) % 100 as int), cast((id + 
2) % 100 as int), cast((id + 3) % 100 as int), cast((id + 4) % 100 as int)) as 
tag_ids",
+        "array(cast(id % 1000 as double) / 10.0, cast((id * 2) % 1000 as 
double) / 10.0, cast((id * 3) % 1000 as double) / 10.0) as scores",
+        "array(concat('tag_', cast(id % 20 as string)), concat('tag_', 
cast((id + 5) % 20 as string)), concat('tag_', cast((id + 10) % 20 as string))) 
as tags",
+        "array(id % 2 = 0, id % 3 = 0, id % 5 = 0, id % 7 = 0) as flag_array",
+        "array(id * 1000, id * 2000, id * 3000) as long_array",
+
+        # Simple maps (66-68)
+        "map('key1', cast(id % 100 as string), 'key2', cast((id * 2) % 100 as 
string), 'key3', cast((id * 3) % 100 as string)) as str_attributes",
+        "map('score1', cast(id % 100 as double), 'score2', cast((id * 2) % 100 
as double)) as double_attributes",
+        "map(cast(id % 10 as int), concat('val_', cast(id % 100 as string)), 
cast((id + 1) % 10 as int), concat('val_', cast((id + 1) % 100 as string))) as 
int_key_map",
+
+        # Level 2 nested struct: address with nested geo (69)
+        "named_struct("
+        "  'street', concat(cast(id % 9999 as string), ' Main St'),"
+        "  'city', array('New York', 'Los Angeles', 'Chicago', 'Houston', 
'Phoenix')[cast(id % 5 as int)],"
+        "  'state', array('NY', 'CA', 'IL', 'TX', 'AZ')[cast(id % 5 as int)],"
+        "  'zip', lpad(cast(id % 99999 as string), 5, '0'),"
+        "  'country', 'USA',"
+        "  'geo', named_struct("
+        "    'lat', cast(id % 180 as double) - 90.0,"
+        "    'lng', cast(id % 360 as double) - 180.0,"
+        "    'accuracy', cast(id % 100 as double)"
+        "  )"
+        ") as address",
+
+        # Level 3 nested struct: organization hierarchy (70)
+        "named_struct("
+        "  'company', named_struct("
+        "    'name', concat('Company_', cast(id % 1000 as string)),"
+        "    'industry', array('tech', 'finance', 'healthcare', 
'retail')[cast(id % 4 as int)],"
+        "    'headquarters', named_struct("
+        "      'city', array('NYC', 'SF', 'LA', 'CHI')[cast(id % 4 as int)],"
+        "      'country', 'USA',"
+        "      'timezone', array('EST', 'PST', 'PST', 'CST')[cast(id % 4 as 
int)]"
+        "    )"
+        "  ),"
+        "  'department', named_struct("
+        "    'name', array('Engineering', 'Sales', 'Marketing', 'HR')[cast(id 
% 4 as int)],"
+        "    'code', concat('DEPT-', cast(id % 100 as string)),"
+        "    'budget', cast(id % 1000000 as double)"
+        "  )"
+        ") as organization",
+
+        # Level 4 nested struct: deep config (71)
+        "named_struct("
+        "  'level1', named_struct("
+        "    'level2a', named_struct("
+        "      'level3a', named_struct("
+        "        'value_int', cast(id % 1000 as int),"
+        "        'value_str', concat('deep_', cast(id % 100 as string)),"
+        "        'value_bool', id % 2 = 0"
+        "      ),"
+        "      'level3b', named_struct("
+        "        'metric1', cast(id % 100 as double),"
+        "        'metric2', cast((id * 2) % 100 as double)"
+        "      )"
+        "    ),"
+        "    'level2b', named_struct("
+        "      'setting1', concat('setting_', cast(id % 50 as string)),"
+        "      'setting2', id % 3 = 0,"
+        "      'values', array(cast(id % 10 as int), cast((id + 1) % 10 as 
int), cast((id + 2) % 10 as int))"
+        "    )"
+        "  ),"
+        "  'metadata', named_struct("
+        "    'version', concat('v', cast(id % 10 as string)),"
+        "    'timestamp', id * 1000"
+        "  )"
+        ") as deep_config",
+
+        # Array of structs with nested structs (72)
+        "array("
+        "  named_struct("
+        "    'item_id', cast(id % 1000 as int),"
+        "    'details', named_struct("
+        "      'name', concat('Item_', cast(id % 100 as string)),"
+        "      'category', array('electronics', 'clothing', 'food', 
'books')[cast(id % 4 as int)],"
+        "      'pricing', named_struct("
+        "        'base', cast(id % 100 as double) + 0.99,"
+        "        'discount', cast(id % 20 as double) / 100.0,"
+        "        'tax_rate', 0.08"
+        "      )"
+        "    ),"
+        "    'quantity', cast(id % 10 + 1 as int)"
+        "  ),"
+        "  named_struct("
+        "    'item_id', cast((id + 100) % 1000 as int),"
+        "    'details', named_struct("
+        "      'name', concat('Item_', cast((id + 100) % 100 as string)),"
+        "      'category', array('electronics', 'clothing', 'food', 
'books')[cast((id + 1) % 4 as int)],"
+        "      'pricing', named_struct("
+        "        'base', cast((id + 50) % 100 as double) + 0.99,"
+        "        'discount', cast((id + 5) % 20 as double) / 100.0,"
+        "        'tax_rate', 0.08"
+        "      )"
+        "    ),"
+        "    'quantity', cast((id + 1) % 10 + 1 as int)"
+        "  )"
+        ") as line_items",
+
+        # Map with struct values (73)
+        "map("
+        "  'primary', named_struct('name', concat('Primary_', cast(id % 100 as 
string)), 'score', cast(id % 100 as double), 'active', true),"
+        "  'secondary', named_struct('name', concat('Secondary_', cast(id % 
100 as string)), 'score', cast((id * 2) % 100 as double), 'active', id % 2 = 0)"
+        ") as contact_map",
+
+        # Struct with map containing arrays (74)
+        "named_struct("
+        "  'config_name', concat('Config_', cast(id % 100 as string)),"
+        "  'settings', map("
+        "    'integers', array(cast(id % 10 as int), cast((id + 1) % 10 as 
int), cast((id + 2) % 10 as int)),"
+        "    'strings', array(concat('s1_', cast(id % 10 as string)), 
concat('s2_', cast(id % 10 as string)))"
+        "  ),"
+        "  'enabled', id % 2 = 0"
+        ") as config_with_map",
+
+        # Array of arrays (75)
+        "array("
+        "  array(cast(id % 10 as int), cast((id + 1) % 10 as int), cast((id + 
2) % 10 as int)),"
+        "  array(cast((id * 2) % 10 as int), cast((id * 2 + 1) % 10 as int)),"
+        "  array(cast((id * 3) % 10 as int), cast((id * 3 + 1) % 10 as int), 
cast((id * 3 + 2) % 10 as int), cast((id * 3 + 3) % 10 as int))"
+        ") as nested_int_arrays",
+
+        # Array of maps (76)
+        "array("
+        "  map('a', cast(id % 100 as string), 'b', cast((id + 1) % 100 as 
string)),"
+        "  map('x', cast((id * 2) % 100 as string), 'y', cast((id * 2 + 1) % 
100 as string), 'z', cast((id * 2 + 2) % 100 as string))"
+        ") as array_of_maps",
+
+        # Map with array values (77)
+        "map("
+        "  'scores', array(cast(id % 100 as double), cast((id * 2) % 100 as 
double), cast((id * 3) % 100 as double)),"
+        "  'ranks', array(cast(id % 10 as double), cast((id + 1) % 10 as 
double))"
+        ") as map_with_arrays",
+
+        # Complex event structure (78)
+        "named_struct("
+        "  'event_id', concat('EVT-', cast(id as string)),"
+        "  'event_type', array('click', 'view', 'purchase', 'signup')[cast(id 
% 4 as int)],"
+        "  'timestamp', id * 1000,"
+        "  'properties', map("
+        "    'source', array('web', 'mobile', 'api')[cast(id % 3 as int)],"
+        "    'campaign', concat('camp_', cast(id % 50 as string))"
+        "  ),"
+        "  'user', named_struct("
+        "    'id', cast(id % 100000 as int),"
+        "    'segment', array('new', 'returning', 'premium')[cast(id % 3 as 
int)],"
+        "    'attributes', named_struct("
+        "      'age_group', array('18-24', '25-34', '35-44', '45+')[cast(id % 
4 as int)],"
+        "      'interests', array(concat('int_', cast(id % 10 as string)), 
concat('int_', cast((id + 1) % 10 as string)))"
+        "    )"
+        "  )"
+        ") as event_data",
+
+        # Financial transaction with deep nesting (79)
+        "named_struct("
+        "  'txn_id', concat('TXN-', cast(id as string)),"
+        "  'amount', named_struct("
+        "    'value', cast(id % 10000 as double) / 100.0,"
+        "    'currency', array('USD', 'EUR', 'GBP')[cast(id % 3 as int)],"
+        "    'exchange', named_struct("
+        "      'rate', 1.0 + cast(id % 100 as double) / 1000.0,"
+        "      'source', 'market',"
+        "      'timestamp', id * 1000"
+        "    )"
+        "  ),"
+        "  'parties', named_struct("
+        "    'sender', named_struct("
+        "      'account', concat('ACC-', lpad(cast(id % 100000 as string), 8, 
'0')),"
+        "      'bank', named_struct("
+        "        'code', concat('BNK-', cast(id % 100 as string)),"
+        "        'country', array('US', 'UK', 'DE', 'JP')[cast(id % 4 as int)]"
+        "      )"
+        "    ),"
+        "    'receiver', named_struct("
+        "      'account', concat('ACC-', lpad(cast((id + 50000) % 100000 as 
string), 8, '0')),"
+        "      'bank', named_struct("
+        "        'code', concat('BNK-', cast((id + 50) % 100 as string)),"
+        "        'country', array('US', 'UK', 'DE', 'JP')[cast((id + 1) % 4 as 
int)]"
+        "      )"
+        "    )"
+        "  )"
+        ") as financial_txn",
+
+        # Product catalog entry (80)
+        "named_struct("
+        "  'product_id', concat('PROD-', lpad(cast(id % 10000 as string), 6, 
'0')),"
+        "  'variants', array("
+        "    named_struct("
+        "      'sku', concat('VAR-', cast(id % 1000 as string), '-A'),"
+        "      'attributes', map('color', 'red', 'size', 'S'),"
+        "      'inventory', named_struct('quantity', cast(id % 100 as int), 
'warehouse', 'WH-1')"
+        "    ),"
+        "    named_struct("
+        "      'sku', concat('VAR-', cast(id % 1000 as string), '-B'),"
+        "      'attributes', map('color', 'blue', 'size', 'M'),"
+        "      'inventory', named_struct('quantity', cast((id + 10) % 100 as 
int), 'warehouse', 'WH-2')"
+        "    )"
+        "  ),"
+        "  'pricing', named_struct("
+        "    'list_price', cast(id % 1000 as double) + 0.99,"
+        "    'tiers', array("
+        "      named_struct('min_qty', 1, 'price', cast(id % 1000 as double) + 
0.99),"
+        "      named_struct('min_qty', 10, 'price', cast(id % 1000 as double) 
* 0.9 + 0.99),"
+        "      named_struct('min_qty', 100, 'price', cast(id % 1000 as double) 
* 0.8 + 0.99)"
+        "    )"
+        "  )"
+        ") as product_catalog",
+
+        # Additional scalar columns (81-90)
+        "cast((id * 53) % 10000 as int) as metric_1",
+        "cast((id * 59) % 10000 as int) as metric_2",
+        "cast((id * 61) % 10000 as int) as metric_3",
+        "cast((id * 67) % 1000000 as long) as counter_1",
+        "cast((id * 71) % 1000000 as long) as counter_2",
+        "cast((id * 73) % 10000 as double) / 100.0 as measure_1",
+        "cast((id * 79) % 10000 as double) / 100.0 as measure_2",
+        "concat('label_', cast(id % 500 as string)) as label_1",
+        "concat('category_', cast(id % 200 as string)) as label_2",
+        "id % 17 = 0 as flag_1",
+
+        # Additional complex columns (91-95)
+        "array("
+        "  named_struct('ts', id * 1000, 'value', cast(id % 100 as double)),"
+        "  named_struct('ts', id * 1000 + 1000, 'value', cast((id + 1) % 100 
as double)),"
+        "  named_struct('ts', id * 1000 + 2000, 'value', cast((id + 2) % 100 
as double))"
+        ") as time_series",
+
+        "map("
+        "  'en', concat('English text ', cast(id % 100 as string)),"
+        "  'es', concat('Spanish texto ', cast(id % 100 as string)),"
+        "  'fr', concat('French texte ', cast(id % 100 as string))"
+        ") as translations",
+
+        "named_struct("
+        "  'rules', array("
+        "    named_struct('id', cast(id % 100 as int), 'condition', 
concat('cond_', cast(id % 10 as string)), 'action', concat('act_', cast(id % 5 
as string))),"
+        "    named_struct('id', cast((id + 1) % 100 as int), 'condition', 
concat('cond_', cast((id + 1) % 10 as string)), 'action', concat('act_', 
cast((id + 1) % 5 as string)))"
+        "  ),"
+        "  'default_action', 'none',"
+        "  'priority', cast(id % 10 as int)"
+        ") as rule_engine",
+
+        "array("
+        "  map('metric', 'cpu', 'value', cast(id % 100 as double), 'unit', 
'percent'),"
+        "  map('metric', 'memory', 'value', cast((id * 2) % 100 as double), 
'unit', 'percent'),"
+        "  map('metric', 'disk', 'value', cast((id * 3) % 100 as double), 
'unit', 'percent')"
+        ") as system_metrics",
+
+        "named_struct("
+        "  'permissions', map("
+        "    'read', array('user', 'admin'),"
+        "    'write', array('admin'),"
+        "    'delete', array('admin')"
+        "  ),"
+        "  'roles', array("
+        "    named_struct('name', 'viewer', 'level', 1),"
+        "    named_struct('name', 'editor', 'level', 2),"
+        "    named_struct('name', 'admin', 'level', 3)"
+        "  )"
+        ") as access_control",
+
+        # Final columns (96-100)
+        "cast((id * 83) % 1000 as int) as final_metric_1",
+        "cast((id * 89) % 10000 as double) / 100.0 as final_measure_1",
+        "concat('final_', cast(id % 1000 as string)) as final_label",
+        "array(cast(id % 5 as int), cast((id + 1) % 5 as int), cast((id + 2) % 
5 as int), cast((id + 3) % 5 as int), cast((id + 4) % 5 as int)) as 
final_array",
+        "named_struct("
+        "  'summary', concat('Summary for record ', cast(id as string)),"
+        "  'checksum', concat(cast(id % 256 as string), '-', cast((id * 7) % 
256 as string), '-', cast((id * 13) % 256 as string)),"
+        "  'version', cast(id % 100 as int)"
+        ") as final_metadata"
+    )
+
+    print(f"Generated schema with {len(df.columns)} columns")
+    df.printSchema()
+
+    # Write as parquet
+    df.write.mode("overwrite").parquet(output_path)
+
+    # Verify the data
+    written_df = spark.read.parquet(output_path)
+    actual_count = written_df.count()
+    print(f"Wrote {actual_count:,} rows to {output_path}")
+
+    spark.stop()
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Generate test data for shuffle benchmark"
+    )
+    parser.add_argument(
+        "--output", "-o",
+        required=True,
+        help="Output path for parquet data (local path or hdfs://...)"
+    )
+    parser.add_argument(
+        "--rows", "-r",
+        type=int,
+        default=10_000_000,
+        help="Number of rows to generate (default: 10000000 for ~1GB with wide 
schema)"
+    )
+    parser.add_argument(
+        "--partitions", "-p",
+        type=int,
+        default=None,
+        help="Number of output partitions (default: auto based on cluster)"
+    )
+
+    args = parser.parse_args()
+
+    # Default partitions to a reasonable number if not specified
+    num_partitions = args.partitions if args.partitions else 200
+
+    generate_data(args.output, args.rows, num_partitions)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/benchmarks/pyspark/run_all_benchmarks.sh 
b/benchmarks/pyspark/run_all_benchmarks.sh
new file mode 100755
index 000000000..707d971f2
--- /dev/null
+++ b/benchmarks/pyspark/run_all_benchmarks.sh
@@ -0,0 +1,118 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run all shuffle benchmarks (Spark, Comet JVM, Comet Native)
+# Check the Spark UI during each run to compare shuffle sizes
+
+set -e
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+DATA_PATH="${1:-/tmp/shuffle-benchmark-data}"
+COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../spark/target/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar}"
+SPARK_MASTER="${SPARK_MASTER:-local[*]}"
+EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-16g}"
+EVENT_LOG_DIR="${EVENT_LOG_DIR:-/tmp/spark-events}"
+
+# Create event log directory
+mkdir -p "$EVENT_LOG_DIR"
+
+echo "========================================"
+echo "Shuffle Size Comparison Benchmark"
+echo "========================================"
+echo "Data path:       $DATA_PATH"
+echo "Comet JAR:       $COMET_JAR"
+echo "Spark master:    $SPARK_MASTER"
+echo "Executor memory: $EXECUTOR_MEMORY"
+echo "Event log dir:   $EVENT_LOG_DIR"
+echo "========================================"
+
+# Run Spark baseline (no Comet)
+echo ""
+echo ">>> Running SPARK shuffle benchmark..."
+$SPARK_HOME/bin/spark-submit \
+  --master "$SPARK_MASTER" \
+  --executor-memory "$EXECUTOR_MEMORY" \
+  --conf spark.eventLog.enabled=true \
+  --conf spark.eventLog.dir="$EVENT_LOG_DIR" \
+  --conf spark.comet.enabled=false \
+  --conf spark.comet.exec.shuffle.enabled=false \
+  "$SCRIPT_DIR/run_benchmark.py" \
+  --data "$DATA_PATH" \
+  --mode spark
+
+# Run Comet JVM shuffle
+echo ""
+echo ">>> Running COMET JVM shuffle benchmark..."
+$SPARK_HOME/bin/spark-submit \
+  --master "$SPARK_MASTER" \
+  --executor-memory "$EXECUTOR_MEMORY" \
+  --jars "$COMET_JAR" \
+  --driver-class-path "$COMET_JAR" \
+  --conf spark.executor.extraClassPath="$COMET_JAR" \
+  --conf spark.eventLog.enabled=true \
+  --conf spark.eventLog.dir="$EVENT_LOG_DIR" \
+  --conf spark.memory.offHeap.enabled=true \
+  --conf spark.memory.offHeap.size=16g \
+  --conf spark.comet.enabled=true \
+  --conf spark.comet.exec.enabled=true \
+  --conf spark.comet.exec.all.enabled=true \
+  --conf spark.comet.exec.shuffle.enabled=true \
+  --conf spark.comet.shuffle.mode=jvm \
+  --conf spark.comet.exec.shuffle.mode=jvm \
+  --conf spark.comet.exec.replaceSortMergeJoin=true \
+  --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
+  --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
+  --conf spark.comet.cast.allowIncompatible=true \
+  "$SCRIPT_DIR/run_benchmark.py" \
+  --data "$DATA_PATH" \
+  --mode jvm
+
+# Run Comet Native shuffle
+echo ""
+echo ">>> Running COMET NATIVE shuffle benchmark..."
+$SPARK_HOME/bin/spark-submit \
+  --master "$SPARK_MASTER" \
+  --executor-memory "$EXECUTOR_MEMORY" \
+  --jars "$COMET_JAR" \
+  --driver-class-path "$COMET_JAR" \
+  --conf spark.executor.extraClassPath="$COMET_JAR" \
+  --conf spark.eventLog.enabled=true \
+  --conf spark.eventLog.dir="$EVENT_LOG_DIR" \
+  --conf spark.memory.offHeap.enabled=true \
+  --conf spark.memory.offHeap.size=16g \
+  --conf spark.comet.enabled=true \
+  --conf spark.comet.exec.enabled=true \
+  --conf spark.comet.exec.all.enabled=true \
+  --conf spark.comet.exec.shuffle.enabled=true \
+  --conf spark.comet.exec.shuffle.mode=native \
+  --conf spark.comet.exec.replaceSortMergeJoin=true \
+  --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
+  --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
+  --conf spark.comet.cast.allowIncompatible=true \
+  "$SCRIPT_DIR/run_benchmark.py" \
+  --data "$DATA_PATH" \
+  --mode native
+
+echo ""
+echo "========================================"
+echo "BENCHMARK COMPLETE"
+echo "========================================"
+echo "Event logs written to: $EVENT_LOG_DIR"
+echo ""
diff --git a/benchmarks/pyspark/run_benchmark.py 
b/benchmarks/pyspark/run_benchmark.py
new file mode 100755
index 000000000..3f40b7c93
--- /dev/null
+++ b/benchmarks/pyspark/run_benchmark.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Run shuffle size comparison benchmark.
+
+Run this script once per mode (spark, jvm, native) with appropriate 
spark-submit configs.
+Check the Spark UI to compare shuffle sizes between modes.
+"""
+
+import argparse
+import time
+import json
+
+from pyspark.sql import SparkSession
+
+
+def run_benchmark(spark: SparkSession, data_path: str, mode: str) -> int:
+    """Run the benchmark query and return duration in ms."""
+
+    spark.catalog.clearCache()
+
+    df = spark.read.parquet(data_path)
+    row_count = df.count()
+    print(f"Number of rows: {row_count:,}")
+
+    start_time = time.time()
+
+    # Repartition by a different key to force full shuffle of all columns
+    # This shuffles all 50 columns including nested structs, arrays, maps
+    repartitioned = df.repartition(200, "group_key")
+
+    # Write to parquet to force materialization
+    output_path = f"/tmp/shuffle-benchmark-output-{mode}"
+    repartitioned.write.mode("overwrite").parquet(output_path)
+    print(f"Wrote repartitioned data to: {output_path}")
+
+    duration_ms = int((time.time() - start_time) * 1000)
+    return duration_ms
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Run shuffle benchmark for a single mode"
+    )
+    parser.add_argument(
+        "--data", "-d",
+        required=True,
+        help="Path to input parquet data"
+    )
+    parser.add_argument(
+        "--mode", "-m",
+        required=True,
+        choices=["spark", "jvm", "native"],
+        help="Shuffle mode being tested"
+    )
+
+    args = parser.parse_args()
+
+    spark = SparkSession.builder \
+        .appName(f"ShuffleBenchmark-{args.mode.upper()}") \
+        .getOrCreate()
+
+    print("\n" + "=" * 80)
+    print(f"Shuffle Benchmark: {args.mode.upper()}")
+    print("=" * 80)
+    print(f"Data path: {args.data}")
+
+    # Print shuffle configuration
+    conf = spark.sparkContext.getConf()
+    print(f"Shuffle manager: {conf.get('spark.shuffle.manager', 'default')}")
+    print(f"Comet enabled: {conf.get('spark.comet.enabled', 'false')}")
+    print(f"Comet shuffle enabled: 
{conf.get('spark.comet.exec.shuffle.enabled', 'false')}")
+    print(f"Comet shuffle mode: {conf.get('spark.comet.shuffle.mode', 'not 
set')}")
+    print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
+
+    try:
+        duration_ms = run_benchmark(spark, args.data, args.mode)
+        print(f"\nDuration: {duration_ms:,} ms")
+        print("\nCheck Spark UI for shuffle sizes")
+
+    finally:
+        spark.stop()
+
+    print("=" * 80 + "\n")
+
+
+if __name__ == "__main__":
+    main()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to