This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e87e1a36c perf: Improve benchmarks for native row-to-columnar used by 
JVM shuffle (#3290)
e87e1a36c is described below

commit e87e1a36ccd3019dac4291da8457731de1f3e838
Author: Andy Grove <[email protected]>
AuthorDate: Wed Feb 11 08:40:11 2026 -0700

    perf: Improve benchmarks for native row-to-columnar used by JVM shuffle 
(#3290)
---
 native/core/benches/row_columnar.rs | 382 +++++++++++++++++++++++++++++++-----
 1 file changed, 331 insertions(+), 51 deletions(-)

diff --git a/native/core/benches/row_columnar.rs 
b/native/core/benches/row_columnar.rs
index a040b25eb..4ee153906 100644
--- a/native/core/benches/row_columnar.rs
+++ b/native/core/benches/row_columnar.rs
@@ -15,65 +15,247 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::datatypes::DataType as ArrowDataType;
+//! Benchmarks for JVM shuffle row-to-columnar conversion.
+//!
+//! Measures `process_sorted_row_partition()` performance for converting Spark
+//! UnsafeRow data to Arrow arrays, covering primitive, struct (flat/nested),
+//! list, and map types.
+
+use arrow::datatypes::{DataType as ArrowDataType, Field, Fields};
 use comet::execution::shuffle::spark_unsafe::row::{
     process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
 };
 use comet::execution::shuffle::CompressionCodec;
-use criterion::{criterion_group, criterion_main, Criterion};
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use std::sync::Arc;
 use tempfile::Builder;
 
-const NUM_ROWS: usize = 10000;
 const BATCH_SIZE: usize = 5000;
-const NUM_COLS: usize = 100;
-const ROW_SIZE: usize = SparkUnsafeRow::get_row_bitset_width(NUM_COLS) + 
NUM_COLS * 8;
 
-fn benchmark(c: &mut Criterion) {
-    let mut group = c.benchmark_group("row_array_conversion");
+/// Size of an Int64 value in bytes.
+const INT64_SIZE: usize = 8;
 
-    group.bench_function("row_to_array", |b| {
-        let spark_rows = (0..NUM_ROWS)
-            .map(|_| {
-                let mut spark_row = 
SparkUnsafeRow::new_with_num_fields(NUM_COLS);
-                let mut row = Row::new();
+/// Size of a pointer in Spark's UnsafeRow format. Encodes a 32-bit offset
+/// (upper bits) and 32-bit size (lower bits) — always 8 bytes regardless of
+/// hardware architecture.
+const UNSAFE_ROW_POINTER_SIZE: usize = 8;
 
-                for i in 
SparkUnsafeRow::get_row_bitset_width(NUM_COLS)..ROW_SIZE {
-                    row.data[i] = i as u8;
-                }
+/// Size of the element-count field in UnsafeRow array/map headers.
+const ARRAY_HEADER_SIZE: usize = 8;
 
-                row.to_spark_row(&mut spark_row);
+// ─── UnsafeRow helpers ──────────────────────────────────────────────────────
 
-                for i in 0..NUM_COLS {
-                    spark_row.set_not_null_at(i);
-                }
+/// Write an UnsafeRow offset+size pointer at `pos` in `data`.
+fn write_pointer(data: &mut [u8], pos: usize, offset: usize, size: usize) {
+    let value = ((offset as i64) << 32) | (size as i64);
+    data[pos..pos + 
UNSAFE_ROW_POINTER_SIZE].copy_from_slice(&value.to_le_bytes());
+}
 
-                spark_row
-            })
-            .collect::<Vec<_>>();
-
-        let mut row_addresses = spark_rows
-            .iter()
-            .map(|row| row.get_row_addr())
-            .collect::<Vec<_>>();
-        let mut row_sizes = spark_rows
-            .iter()
-            .map(|row| row.get_row_size())
-            .collect::<Vec<_>>();
-
-        let row_address_ptr = row_addresses.as_mut_ptr();
-        let row_size_ptr = row_sizes.as_mut_ptr();
-        let schema = vec![ArrowDataType::Int64; NUM_COLS];
+/// Byte size of a null-bitset for `n` elements (64-bit words, rounded up).
+fn null_bitset_size(n: usize) -> usize {
+    n.div_ceil(64) * 8
+}
 
-        b.iter(|| {
-            let tempfile = Builder::new().tempfile().unwrap();
+// ─── Schema builders ────────────────────────────────────────────────────────
+
+/// Create a struct schema with `depth` nesting levels and `num_leaf_fields`
+/// Int64 leaf fields.
+///
+/// - depth=1: `Struct<f0: Int64, f1: Int64, …>`
+/// - depth=2: `Struct<nested: Struct<f0: Int64, …>>`
+/// - depth=3: `Struct<nested: Struct<nested: Struct<f0: Int64, …>>>`
+fn make_struct_schema(depth: usize, num_leaf_fields: usize) -> ArrowDataType {
+    let leaf_fields: Vec<Field> = (0..num_leaf_fields)
+        .map(|i| Field::new(format!("f{i}"), ArrowDataType::Int64, true))
+        .collect();
+    let mut dt = ArrowDataType::Struct(Fields::from(leaf_fields));
+    for _ in 0..depth - 1 {
+        dt = ArrowDataType::Struct(Fields::from(vec![Field::new("nested", dt, 
true)]));
+    }
+    dt
+}
+
+fn make_list_schema() -> ArrowDataType {
+    ArrowDataType::List(Arc::new(Field::new("item", ArrowDataType::Int64, 
true)))
+}
+
+fn make_map_schema() -> ArrowDataType {
+    let entries = Field::new(
+        "entries",
+        ArrowDataType::Struct(Fields::from(vec![
+            Field::new("key", ArrowDataType::Int64, false),
+            Field::new("value", ArrowDataType::Int64, true),
+        ])),
+        false,
+    );
+    ArrowDataType::Map(Arc::new(entries), false)
+}
+
+// ─── Row data builders ──────────────────────────────────────────────────────
+
+/// Build a binary UnsafeRow containing a struct column with `depth` nesting
+/// levels and `num_leaf_fields` Int64 fields at the innermost level.
+fn build_struct_row(depth: usize, num_leaf_fields: usize) -> Vec<u8> {
+    let top_bitset = SparkUnsafeRow::get_row_bitset_width(1);
+    let inter_bitset = SparkUnsafeRow::get_row_bitset_width(1);
+    let leaf_bitset = SparkUnsafeRow::get_row_bitset_width(num_leaf_fields);
+
+    let inter_level_size = inter_bitset + UNSAFE_ROW_POINTER_SIZE;
+    let leaf_level_size = leaf_bitset + num_leaf_fields * INT64_SIZE;
+
+    let total =
+        top_bitset + UNSAFE_ROW_POINTER_SIZE + (depth - 1) * inter_level_size 
+ leaf_level_size;
+    let mut data = vec![0u8; total];
+
+    // Absolute start position of each struct level in the buffer
+    let mut struct_starts = Vec::with_capacity(depth);
+    let mut pos = top_bitset + UNSAFE_ROW_POINTER_SIZE;
+    for level in 0..depth {
+        struct_starts.push(pos);
+        if level < depth - 1 {
+            pos += inter_level_size;
+        }
+    }
+
+    // Top-level pointer → first struct (absolute offset from row start)
+    let first_size = if depth == 1 {
+        leaf_level_size
+    } else {
+        inter_level_size
+    };
+    write_pointer(&mut data, top_bitset, struct_starts[0], first_size);
+
+    // Intermediate struct pointers (offsets relative to their own struct 
start)
+    for level in 0..depth - 1 {
+        let next_size = if level + 1 == depth - 1 {
+            leaf_level_size
+        } else {
+            inter_level_size
+        };
+        write_pointer(
+            &mut data,
+            struct_starts[level] + inter_bitset,
+            struct_starts[level + 1] - struct_starts[level],
+            next_size,
+        );
+    }
+
+    // Fill leaf struct with sample data
+    let leaf_start = *struct_starts.last().unwrap();
+    for i in 0..num_leaf_fields {
+        let off = leaf_start + leaf_bitset + i * INT64_SIZE;
+        data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 
100).to_le_bytes());
+    }
+
+    data
+}
+
+/// Build a binary UnsafeRow containing a `List<Int64>` column.
+fn build_list_row(num_elements: usize) -> Vec<u8> {
+    let top_bitset = SparkUnsafeRow::get_row_bitset_width(1);
+    let elem_null_bitset = null_bitset_size(num_elements);
+    let list_size = ARRAY_HEADER_SIZE + elem_null_bitset + num_elements * 
INT64_SIZE;
+    let total = top_bitset + UNSAFE_ROW_POINTER_SIZE + list_size;
+    let mut data = vec![0u8; total];
+
+    let list_offset = top_bitset + UNSAFE_ROW_POINTER_SIZE;
+    write_pointer(&mut data, top_bitset, list_offset, list_size);
+
+    // Element count
+    data[list_offset..list_offset + ARRAY_HEADER_SIZE]
+        .copy_from_slice(&(num_elements as i64).to_le_bytes());
+
+    // Element values
+    let data_start = list_offset + ARRAY_HEADER_SIZE + elem_null_bitset;
+    for i in 0..num_elements {
+        let off = data_start + i * INT64_SIZE;
+        data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 
100).to_le_bytes());
+    }
+
+    data
+}
+
+/// Build a binary UnsafeRow containing a `Map<Int64, Int64>` column.
+fn build_map_row(num_entries: usize) -> Vec<u8> {
+    let top_bitset = SparkUnsafeRow::get_row_bitset_width(1);
+    let entry_null_bitset = null_bitset_size(num_entries);
+    let array_size = ARRAY_HEADER_SIZE + entry_null_bitset + num_entries * 
INT64_SIZE;
+    // Map layout: [key_array_size header] [key_array] [value_array]
+    let map_size = ARRAY_HEADER_SIZE + 2 * array_size;
+    let total = top_bitset + UNSAFE_ROW_POINTER_SIZE + map_size;
+    let mut data = vec![0u8; total];
+
+    let map_offset = top_bitset + UNSAFE_ROW_POINTER_SIZE;
+    write_pointer(&mut data, top_bitset, map_offset, map_size);
+
+    // Key array size header
+    data[map_offset..map_offset + ARRAY_HEADER_SIZE]
+        .copy_from_slice(&(array_size as i64).to_le_bytes());
 
+    // Key array: [element count] [null bitset] [data]
+    let key_offset = map_offset + ARRAY_HEADER_SIZE;
+    data[key_offset..key_offset + ARRAY_HEADER_SIZE]
+        .copy_from_slice(&(num_entries as i64).to_le_bytes());
+    let key_data = key_offset + ARRAY_HEADER_SIZE + entry_null_bitset;
+    for i in 0..num_entries {
+        let off = key_data + i * INT64_SIZE;
+        data[off..off + INT64_SIZE].copy_from_slice(&(i as i64).to_le_bytes());
+    }
+
+    // Value array: [element count] [null bitset] [data]
+    let val_offset = key_offset + array_size;
+    data[val_offset..val_offset + ARRAY_HEADER_SIZE]
+        .copy_from_slice(&(num_entries as i64).to_le_bytes());
+    let val_data = val_offset + ARRAY_HEADER_SIZE + entry_null_bitset;
+    for i in 0..num_entries {
+        let off = val_data + i * INT64_SIZE;
+        data[off..off + INT64_SIZE].copy_from_slice(&((i as i64) * 
100).to_le_bytes());
+    }
+
+    data
+}
+
+// ─── Benchmark runner ───────────────────────────────────────────────────────
+
+/// Common benchmark harness: wraps raw row bytes in SparkUnsafeRow and runs
+/// `process_sorted_row_partition` under Criterion.
+fn run_benchmark(
+    group: &mut criterion::BenchmarkGroup<criterion::measurement::WallTime>,
+    name: &str,
+    param: &str,
+    schema: &[ArrowDataType],
+    rows: &[Vec<u8>],
+    num_top_level_fields: usize,
+) {
+    let num_rows = rows.len();
+
+    let spark_rows: Vec<SparkUnsafeRow> = rows
+        .iter()
+        .map(|data| {
+            let mut row = 
SparkUnsafeRow::new_with_num_fields(num_top_level_fields);
+            row.point_to_slice(data);
+            for i in 0..num_top_level_fields {
+                row.set_not_null_at(i);
+            }
+            row
+        })
+        .collect();
+
+    let mut addrs: Vec<i64> = spark_rows.iter().map(|r| 
r.get_row_addr()).collect();
+    let mut sizes: Vec<i32> = spark_rows.iter().map(|r| 
r.get_row_size()).collect();
+    let addr_ptr = addrs.as_mut_ptr();
+    let size_ptr = sizes.as_mut_ptr();
+
+    group.bench_with_input(BenchmarkId::new(name, param), &num_rows, |b, &n| {
+        b.iter(|| {
+            let tmp = Builder::new().tempfile().unwrap();
             process_sorted_row_partition(
-                NUM_ROWS,
+                n,
                 BATCH_SIZE,
-                row_address_ptr,
-                row_size_ptr,
-                &schema,
-                tempfile.path().to_str().unwrap().to_string(),
+                addr_ptr,
+                size_ptr,
+                schema,
+                tmp.path().to_str().unwrap().to_string(),
                 1.0,
                 false,
                 0,
@@ -83,22 +265,117 @@ fn benchmark(c: &mut Criterion) {
             .unwrap();
         });
     });
+
+    drop(spark_rows);
 }
 
-struct Row {
-    data: Box<[u8; ROW_SIZE]>,
+// ─── Benchmarks ─────────────────────────────────────────────────────────────
+
+/// 100 primitive Int64 columns — baseline without complex-type overhead.
+fn benchmark_primitive_columns(c: &mut Criterion) {
+    let mut group = c.benchmark_group("primitive_columns");
+    const NUM_COLS: usize = 100;
+    let bitset = SparkUnsafeRow::get_row_bitset_width(NUM_COLS);
+    let row_size = bitset + NUM_COLS * INT64_SIZE;
+
+    for num_rows in [1000, 10000] {
+        let schema = vec![ArrowDataType::Int64; NUM_COLS];
+        let rows: Vec<Vec<u8>> = (0..num_rows)
+            .map(|_| {
+                let mut data = vec![0u8; row_size];
+                for (i, byte) in 
data.iter_mut().enumerate().take(row_size).skip(bitset) {
+                    *byte = i as u8;
+                }
+                data
+            })
+            .collect();
+
+        run_benchmark(
+            &mut group,
+            "cols_100",
+            &format!("rows_{num_rows}"),
+            &schema,
+            &rows,
+            NUM_COLS,
+        );
+    }
+
+    group.finish();
 }
 
-impl Row {
-    pub fn new() -> Self {
-        Row {
-            data: Box::new([0u8; ROW_SIZE]),
+/// Struct columns at varying nesting depths (1 = flat, 2 = nested, 3 = deeply 
nested).
+fn benchmark_struct_conversion(c: &mut Criterion) {
+    let mut group = c.benchmark_group("struct_conversion");
+
+    for (depth, label) in [(1, "flat"), (2, "nested"), (3, "deeply_nested")] {
+        for num_fields in [5, 10, 20] {
+            for num_rows in [1000, 10000] {
+                let schema = vec![make_struct_schema(depth, num_fields)];
+                let rows: Vec<Vec<u8>> = (0..num_rows)
+                    .map(|_| build_struct_row(depth, num_fields))
+                    .collect();
+
+                run_benchmark(
+                    &mut group,
+                    &format!("{label}_fields_{num_fields}"),
+                    &format!("rows_{num_rows}"),
+                    &schema,
+                    &rows,
+                    1,
+                );
+            }
         }
     }
 
-    pub fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) {
-        spark_row.point_to_slice(self.data.as_ref());
+    group.finish();
+}
+
+/// List<Int64> columns with varying element counts.
+fn benchmark_list_conversion(c: &mut Criterion) {
+    let mut group = c.benchmark_group("list_conversion");
+
+    for num_elements in [10, 100] {
+        for num_rows in [1000, 10000] {
+            let schema = vec![make_list_schema()];
+            let rows: Vec<Vec<u8>> = (0..num_rows)
+                .map(|_| build_list_row(num_elements))
+                .collect();
+
+            run_benchmark(
+                &mut group,
+                &format!("elements_{num_elements}"),
+                &format!("rows_{num_rows}"),
+                &schema,
+                &rows,
+                1,
+            );
+        }
     }
+
+    group.finish();
+}
+
+/// Map<Int64, Int64> columns with varying entry counts.
+fn benchmark_map_conversion(c: &mut Criterion) {
+    let mut group = c.benchmark_group("map_conversion");
+
+    for num_entries in [10, 100] {
+        for num_rows in [1000, 10000] {
+            let schema = vec![make_map_schema()];
+            let rows: Vec<Vec<u8>> = (0..num_rows).map(|_| 
build_map_row(num_entries)).collect();
+
+            run_benchmark(
+                &mut group,
+                &format!("entries_{num_entries}"),
+                &format!("rows_{num_rows}"),
+                &schema,
+                &rows,
+                1,
+            );
+        }
+    }
+
+    group.finish();
 }
 
 fn config() -> Criterion {
@@ -108,6 +385,9 @@ fn config() -> Criterion {
 criterion_group! {
     name = benches;
     config = config();
-    targets = benchmark
+    targets = benchmark_primitive_columns,
+              benchmark_struct_conversion,
+              benchmark_list_conversion,
+              benchmark_map_conversion
 }
 criterion_main!(benches);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to