andygrove opened a new issue, #3057:
URL: https://github.com/apache/datafusion-comet/issues/3057
### What is the problem the feature request solves?
We have seen poor performance with JVM shuffle in some cases, especially
when complex types are involved. I asked Claude to take a look and it came up
with some ideas that I plan on exploring.
### What Claude Said
Based on my review, here are the significant performance issues I found in
the sort-based shuffle implementation:
Performance Analysis: Sort-Based Shuffle
1. Per-Partition JNI Calls (Critical)
Location: CometShuffleExternalSorter.java:538-600
while (sortedRecords.hasNext()) {
// ...
if (partition != currentPartition) {
if (currentPartition != -1) {
// JNI call for EACH partition
long written = doSpilling(dataTypes, spillInfo.file,
rowPartition, ...);
}
currentPartition = partition;
}
rowPartition.addRow(...);
}
With 200 partitions, this makes ~200 separate JNI calls to
writeSortedFileNative. Each call:
- Deserializes schema (converts Array[Array[Byte]] to Rust types)
- Gets JNI array elements
- Creates Arrow builders
- Opens file, writes, closes
Impact: JNI overhead multiplied by partition count. With many partitions,
this dominates execution time.
---
2. RowPartition Array Allocation Per Partition
Location: RowPartition.scala:39-54
def getRowAddresses: Array[Long] = {
val array = rowAddresses.toArray // Creates NEW array copy
rowAddresses = null
array
}
def reset(): Unit = {
rowAddresses = new ArrayBuffer[Long](initialSize) // NEW ArrayBuffer
rowSizes = new ArrayBuffer[Int](initialSize)
}
For each partition boundary:
1. toArray allocates a new array and copies all elements
2. reset() creates new ArrayBuffers
Impact: O(partitions × rows_per_partition) allocations and copies.
---
3. File Open/Close Per Batch in Native Code
Location: row.rs:826-831
while current_row < row_num {
// ... build batch ...
// Opens file on EVERY batch iteration
let mut output_data = OpenOptions::new()
.create(true)
.append(true)
.open(&output_path)?;
output_data.write_all(&frozen)?;
// File closed here (drop)
}
Impact: Repeated syscalls for file open/close on every batch within every
partition.
---
4. Builder Re-creation Per Batch
Location: row.rs:782-787
while current_row < row_num {
let mut data_builders: Vec<Box<dyn ArrayBuilder>> = vec![];
schema.iter().try_for_each(|dt| {
make_builders(dt, n, prefer_dictionary_ratio) // NEW builders
each batch
.map(|builder| data_builders.push(builder))?;
})?;
// ...
}
Impact: Heap allocations for builders on every batch. Builders could be
reset and reused.
---
5. Column-by-Column Row Access (Cache Unfriendly)
Location: row.rs:792-803
for (idx, builder) in data_builders.iter_mut().enumerate() {
append_columns(row_addresses_ptr, row_sizes_ptr, row_start, row_end,
schema, idx, builder, ...)?;
}
Inside append_columns:
for i in row_start..row_end {
let row_addr = unsafe { *row_addresses_ptr.add(i) };
// Access row data for column idx
}
Impact: For each column, iterates all rows. Row data is accessed multiple
times (once per column), causing cache misses.
---
6. Double Data Copy Path
Location: CometUnsafeShuffleWriter.java:296-310 and
CometShuffleExternalSorter.java:642-649
// Writer: serialize to buffer
serOutputStream.writeValue((UnsafeRow) record._2(), OBJECT_CLASS_TAG);
// Sorter: copy to memory page
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
Data path: UnsafeRow → serialized buffer → memory page → native Arrow
conversion
---
Recommended Fixes (Priority Order)
1. Batch partition writes: Collect all rows across partitions, then make a
single JNI call that writes all partitions. Pass partition boundaries to native
code.
2. Reuse file handle: Open file once at start of
process_sorted_row_partition, write all batches, close once.
3. Reuse Arrow builders: Create builders once, call builder.finish() and
recreate only when schema changes, or use builder reset patterns.
4. Pre-allocate RowPartition arrays: Use primitive arrays with a cursor
instead of ArrayBuffer to avoid toArray copies.
5. Row-major processing: Consider processing row-by-row to improve cache
locality, appending to all column builders in one pass per row.
### Describe the potential solution
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]