This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new ec5df97b4 feat: Add support for round-robin partitioning in native 
shuffle (#3076)
ec5df97b4 is described below

commit ec5df97b40c366646aa7c59514f49c8ba64d588a
Author: Andy Grove <[email protected]>
AuthorDate: Sun Jan 25 05:41:41 2026 -0700

    feat: Add support for round-robin partitioning in native shuffle (#3076)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  30 ++++
 docs/source/contributor-guide/jvm_shuffle.md       |   8 +-
 docs/source/contributor-guide/native_shuffle.md    |  22 ++-
 docs/source/user-guide/latest/compatibility.md     |  26 ++++
 native/core/src/execution/planner.rs               |  12 ++
 .../src/execution/shuffle/comet_partitioning.rs    |   6 +-
 .../core/src/execution/shuffle/shuffle_writer.rs   | 161 ++++++++++++++++++++-
 native/proto/src/proto/partitioning.proto          |   7 +
 .../shuffle/CometNativeShuffleWriter.scala         |  12 +-
 .../shuffle/CometShuffleExchangeExec.scala         |  31 +++-
 .../comet/exec/CometNativeShuffleSuite.scala       |  49 +++++++
 11 files changed, 348 insertions(+), 16 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 656dbc9a5..1e7adfd58 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -375,6 +375,36 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(true)
 
+  val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED: 
ConfigEntry[Boolean] =
+    conf("spark.comet.native.shuffle.partitioning.roundrobin.enabled")
+      .category(CATEGORY_SHUFFLE)
+      .doc(
+        "Whether to enable round robin partitioning for Comet native shuffle. 
" +
+          "This is disabled by default because Comet's round-robin produces 
different " +
+          "partition assignments than Spark. Spark sorts rows by their binary 
UnsafeRow " +
+          "representation before assigning partitions, but Comet uses Arrow 
format which " +
+          "has a different binary layout. Instead, Comet implements 
round-robin as hash " +
+          "partitioning on all columns, which achieves the same goals: even 
distribution, " +
+          "deterministic output (for fault tolerance), and no semantic 
grouping. " +
+          "Sorted output will be identical to Spark, but unsorted row ordering 
may differ.")
+      .booleanConf
+      .createWithDefault(false)
+
+  val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS: 
ConfigEntry[Int] =
+    conf("spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns")
+      .category(CATEGORY_SHUFFLE)
+      .doc(
+        "The maximum number of columns to hash for round robin partitioning. " 
+
+          "When set to 0 (the default), all columns are hashed. " +
+          "When set to a positive value, only the first N columns are used for 
hashing, " +
+          "which can improve performance for wide tables while still providing 
" +
+          "reasonable distribution.")
+      .intConf
+      .checkValue(
+        v => v >= 0,
+        "The maximum number of columns to hash for round robin partitioning 
must be non-negative.")
+      .createWithDefault(0)
+
   val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
     conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
       .category(CATEGORY_SHUFFLE)
diff --git a/docs/source/contributor-guide/jvm_shuffle.md 
b/docs/source/contributor-guide/jvm_shuffle.md
index e011651d2..2145c82eb 100644
--- a/docs/source/contributor-guide/jvm_shuffle.md
+++ b/docs/source/contributor-guide/jvm_shuffle.md
@@ -46,12 +46,10 @@ JVM shuffle (`CometColumnarExchange`) is used instead of 
native shuffle (`CometE
    (not a `CometPlan`), JVM shuffle is the only option since native shuffle 
requires columnar input
    from Comet operators.
 
-3. **Unsupported partitioning type**: Native shuffle only supports 
`HashPartitioning`, `RangePartitioning`,
-   and `SinglePartition`. JVM shuffle additionally supports 
`RoundRobinPartitioning`.
-
-4. **Unsupported partition key types**: For `HashPartitioning` and 
`RangePartitioning`, native shuffle
+3. **Unsupported partition key types**: For `HashPartitioning` and 
`RangePartitioning`, native shuffle
    only supports primitive types as partition keys. Complex types (struct, 
array, map) cannot be used
-   as partition keys in native shuffle, though they are fully supported as 
data columns in both implementations.
+   as partition keys in native shuffle and will fall back to JVM columnar 
shuffle. Note that complex types are
+   fully supported as data columns in both implementations.
 
 ## Input Handling
 
diff --git a/docs/source/contributor-guide/native_shuffle.md 
b/docs/source/contributor-guide/native_shuffle.md
index e3d2dea47..18e80a90c 100644
--- a/docs/source/contributor-guide/native_shuffle.md
+++ b/docs/source/contributor-guide/native_shuffle.md
@@ -52,8 +52,7 @@ Native shuffle (`CometExchange`) is selected when all of the 
following condition
    - `HashPartitioning`
    - `RangePartitioning`
    - `SinglePartition`
-
-   `RoundRobinPartitioning` requires JVM shuffle.
+   - `RoundRobinPartitioning`
 
 4. **Supported partition key types**: For `HashPartitioning` and 
`RangePartitioning`, partition
    keys must be primitive types. Complex types (struct, array, map) as 
partition keys require
@@ -131,7 +130,7 @@ Native shuffle (`CometExchange`) is selected when all of 
the following condition
 2. **Native execution**: `CometExec.getCometIterator()` executes the plan in 
Rust.
 
 3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the 
appropriate partitioner:
-   - `MultiPartitionShuffleRepartitioner`: For hash/range partitioning
+   - `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin 
partitioning
    - `SinglePartitionShufflePartitioner`: For single partition (simpler path)
 
 4. **Buffering and spilling**: The partitioner buffers rows per partition. 
When memory pressure
@@ -187,6 +186,19 @@ For range partitioning:
 The simplest case: all rows go to partition 0. Uses 
`SinglePartitionShufflePartitioner` which
 simply concatenates batches to reach the configured batch size.
 
+### Round Robin Partitioning
+
+Comet implements round robin partitioning using hash-based assignment for 
determinism:
+
+1. Computes a Murmur3 hash of columns (using seed 42)
+2. Assigns partitions directly using the hash: `partition_id = hash % 
num_partitions`
+
+This approach guarantees determinism across retries, which is critical for 
fault tolerance.
+However, unlike true round robin which cycles through partitions row-by-row, 
hash-based
+assignment only provides even distribution when the data has sufficient 
variation in the
+hashed columns. Data with low cardinality or identical values may result in 
skewed partition
+sizes.
+
 ## Memory Management
 
 Native shuffle uses DataFusion's memory management with spilling support:
@@ -235,8 +247,8 @@ independently compressed, allowing parallel decompression 
during reads.
 | ------------------- | -------------------------------------- | 
--------------------------------- |
 | Input format        | Columnar (direct from Comet operators) | Row-based 
(via ColumnarToRowExec) |
 | Partitioning logic  | Rust implementation                    | Spark's 
partitioner               |
-| Supported schemes   | Hash, Range, Single                    | Hash, Range, 
Single, RoundRobin   |
-| Partition key types | Primitives only                        | Any type      
                    |
+| Supported schemes   | Hash, Range, Single, RoundRobin        | Hash, Range, 
Single, RoundRobin   |
+| Partition key types | Primitives only (Hash, Range)          | Any type      
                    |
 | Performance         | Higher (no format conversion)          | Lower 
(columnar→row→columnar)     |
 | Writer variants     | Single path                            | Bypass (hash) 
and sort-based      |
 
diff --git a/docs/source/user-guide/latest/compatibility.md 
b/docs/source/user-guide/latest/compatibility.md
index 64bd9d2bc..c09f6a61e 100644
--- a/docs/source/user-guide/latest/compatibility.md
+++ b/docs/source/user-guide/latest/compatibility.md
@@ -69,6 +69,32 @@ this can be overridden by setting 
`spark.comet.regexp.allowIncompatible=true`.
 Comet's support for window functions is incomplete and known to be incorrect. 
It is disabled by default and
 should not be used in production. The feature will be enabled in a future 
release. Tracking issue: 
[#2721](https://github.com/apache/datafusion-comet/issues/2721).
 
+## Round-Robin Partitioning
+
+Comet's native shuffle implementation of round-robin partitioning 
(`df.repartition(n)`) is not compatible with
+Spark's implementation and is disabled by default. It can be enabled by setting
+`spark.comet.native.shuffle.partitioning.roundrobin.enabled=true`.
+
+**Why the incompatibility exists:**
+
+Spark's round-robin partitioning sorts rows by their binary `UnsafeRow` 
representation before assigning them to
+partitions. This ensures deterministic output for fault tolerance (task 
retries produce identical results).
+Comet uses Arrow format internally, which has a completely different binary 
layout than `UnsafeRow`, making it
+impossible to match Spark's exact partition assignments.
+
+**Comet's approach:**
+
+Instead of true round-robin assignment, Comet implements round-robin as hash 
partitioning on ALL columns. This
+achieves the same semantic goals:
+
+- **Even distribution**: Rows are distributed evenly across partitions (as 
long as the hash varies sufficiently -
+  in some cases there could be skew)
+- **Deterministic**: Same input always produces the same partition assignments 
(important for fault tolerance)
+- **No semantic grouping**: Unlike hash partitioning on specific columns, this 
doesn't group related rows together
+
+The only difference is that Comet's partition assignments will differ from 
Spark's. When results are sorted,
+they will be identical to Spark. Unsorted results may have different row 
ordering.
+
 ## Cast
 
 Cast operations in Comet fall into three levels of support:
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 6a4ad97f8..44ff20a44 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2342,6 +2342,18 @@ impl PhysicalPlanner {
                 ))
             }
             PartitioningStruct::SinglePartition(_) => 
Ok(CometPartitioning::SinglePartition),
+            PartitioningStruct::RoundRobinPartition(rr_partition) => {
+                // Treat negative max_hash_columns as 0 (no limit)
+                let max_hash_columns = if rr_partition.max_hash_columns <= 0 {
+                    0
+                } else {
+                    rr_partition.max_hash_columns as usize
+                };
+                Ok(CometPartitioning::RoundRobin(
+                    rr_partition.num_partitions as usize,
+                    max_hash_columns,
+                ))
+            }
         }
     }
 
diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs 
b/native/core/src/execution/shuffle/comet_partitioning.rs
index a2422cf9e..b7ad15879 100644
--- a/native/core/src/execution/shuffle/comet_partitioning.rs
+++ b/native/core/src/execution/shuffle/comet_partitioning.rs
@@ -31,6 +31,10 @@ pub enum CometPartitioning {
     /// Rows for comparing to 4) OwnedRows that represent the boundaries of 
each partition, used with
     /// LexOrdering to bin each value in the RecordBatch to a partition.
     RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
+    /// Round robin partitioning. Distributes rows across partitions by 
sorting them by hash
+    /// (computed from columns) and then assigning partitions sequentially. 
Args are:
+    /// 1) number of partitions, 2) max columns to hash (0 means no limit).
+    RoundRobin(usize, usize),
 }
 
 impl CometPartitioning {
@@ -38,7 +42,7 @@ impl CometPartitioning {
         use CometPartitioning::*;
         match self {
             SinglePartition => 1,
-            Hash(_, n) | RangePartitioning(_, n, _, _) => *n,
+            Hash(_, n) | RangePartitioning(_, n, _, _) | RoundRobin(n, _) => 
*n,
         }
     }
 }
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index f21cde2ba..55d6a9ef9 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -382,8 +382,11 @@ impl MultiPartitionShuffleRepartitioner {
         // The initial values are not used.
         let scratch = ScratchSpace {
             hashes_buf: match partitioning {
-                // Only allocate the hashes_buf if hash partitioning.
-                CometPartitioning::Hash(_, _) => vec![0; batch_size],
+                // Allocate hashes_buf for hash and round robin partitioning.
+                // Round robin hashes all columns to achieve even, 
deterministic distribution.
+                CometPartitioning::Hash(_, _) | 
CometPartitioning::RoundRobin(_, _) => {
+                    vec![0; batch_size]
+                }
                 _ => vec![],
             },
             partition_ids: vec![0; batch_size],
@@ -598,6 +601,68 @@ impl MultiPartitionShuffleRepartitioner {
                 .await?;
                 self.scratch = scratch;
             }
+            CometPartitioning::RoundRobin(num_output_partitions, 
max_hash_columns) => {
+                // Comet implements "round robin" as hash partitioning on 
columns.
+                // This achieves the same goal as Spark's round robin (even 
distribution
+                // without semantic grouping) while being deterministic for 
fault tolerance.
+                //
+                // Note: This produces different partition assignments than 
Spark's round robin,
+                // which sorts by UnsafeRow binary representation before 
assigning partitions.
+                // However, both approaches provide even distribution and 
determinism.
+                let mut scratch = std::mem::take(&mut self.scratch);
+                let (partition_starts, partition_row_indices): (&Vec<u32>, 
&Vec<u32>) = {
+                    let mut timer = self.metrics.repart_time.timer();
+
+                    let num_rows = input.num_rows();
+
+                    // Collect columns for hashing, respecting 
max_hash_columns limit
+                    // max_hash_columns of 0 means no limit (hash all columns)
+                    // Negative values are normalized to 0 in the planner
+                    let num_columns_to_hash = if *max_hash_columns == 0 {
+                        input.num_columns()
+                    } else {
+                        (*max_hash_columns).min(input.num_columns())
+                    };
+                    let columns_to_hash: Vec<ArrayRef> = 
(0..num_columns_to_hash)
+                        .map(|i| Arc::clone(input.column(i)))
+                        .collect();
+
+                    // Use identical seed as Spark hash partitioning.
+                    let hashes_buf = &mut scratch.hashes_buf[..num_rows];
+                    hashes_buf.fill(42_u32);
+
+                    // Compute hash for selected columns
+                    create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
+
+                    // Assign partition IDs based on hash (same as hash 
partitioning)
+                    let partition_ids = &mut scratch.partition_ids[..num_rows];
+                    hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
+                        partition_ids[idx] = pmod(*hash, 
*num_output_partitions) as u32;
+                    });
+
+                    // We now have partition ids for every input row, map that 
to partition starts
+                    // and partition indices to eventually write these rows to 
partition buffers.
+                    map_partition_ids_to_starts_and_indices(
+                        &mut scratch,
+                        *num_output_partitions,
+                        num_rows,
+                    );
+
+                    timer.stop();
+                    Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
+                        &scratch.partition_starts,
+                        &scratch.partition_row_indices,
+                    ))
+                }?;
+
+                self.buffer_partitioned_batch_may_spill(
+                    input,
+                    partition_row_indices,
+                    partition_starts,
+                )
+                .await?;
+                self.scratch = scratch;
+            }
             other => {
                 // this should be unreachable as long as the validation logic
                 // in the constructor is kept up-to-date
@@ -1431,6 +1496,7 @@ mod test {
                 Arc::new(row_converter),
                 owned_rows,
             ),
+            CometPartitioning::RoundRobin(num_partitions, 0),
         ] {
             let batches = (0..num_batches).map(|_| 
batch.clone()).collect::<Vec<_>>();
 
@@ -1483,4 +1549,95 @@ mod test {
         let expected = vec![69, 5, 193, 171, 115];
         assert_eq!(result, expected);
     }
+
+    #[test]
+    #[cfg_attr(miri, ignore)]
+    fn test_round_robin_deterministic() {
+        // Test that round robin partitioning produces identical results when 
run multiple times
+        use std::fs;
+        use std::io::Read;
+
+        let batch_size = 1000;
+        let num_batches = 10;
+        let num_partitions = 8;
+
+        let batch = create_batch(batch_size);
+        let batches = (0..num_batches).map(|_| 
batch.clone()).collect::<Vec<_>>();
+
+        // Run shuffle twice and compare results
+        for run in 0..2 {
+            let data_file = format!("/tmp/rr_data_{}.out", run);
+            let index_file = format!("/tmp/rr_index_{}.out", run);
+
+            let partitions = std::slice::from_ref(&batches);
+            let exec = ShuffleWriterExec::try_new(
+                Arc::new(DataSourceExec::new(Arc::new(
+                    MemorySourceConfig::try_new(partitions, batch.schema(), 
None).unwrap(),
+                ))),
+                CometPartitioning::RoundRobin(num_partitions, 0),
+                CompressionCodec::Zstd(1),
+                data_file.clone(),
+                index_file.clone(),
+                false,
+                1024 * 1024,
+            )
+            .unwrap();
+
+            let config = SessionConfig::new();
+            let runtime_env = Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_limit(10 * 1024 * 1024, 1.0)
+                    .build()
+                    .unwrap(),
+            );
+            let session_ctx = 
Arc::new(SessionContext::new_with_config_rt(config, runtime_env));
+            let task_ctx = Arc::new(TaskContext::from(session_ctx.as_ref()));
+
+            // Execute the shuffle
+            futures::executor::block_on(async {
+                let mut stream = exec.execute(0, 
Arc::clone(&task_ctx)).unwrap();
+                while stream.next().await.is_some() {}
+            });
+
+            if run == 1 {
+                // Compare data files
+                let mut data0 = Vec::new();
+                fs::File::open("/tmp/rr_data_0.out")
+                    .unwrap()
+                    .read_to_end(&mut data0)
+                    .unwrap();
+                let mut data1 = Vec::new();
+                fs::File::open("/tmp/rr_data_1.out")
+                    .unwrap()
+                    .read_to_end(&mut data1)
+                    .unwrap();
+                assert_eq!(
+                    data0, data1,
+                    "Round robin shuffle data should be identical across runs"
+                );
+
+                // Compare index files
+                let mut index0 = Vec::new();
+                fs::File::open("/tmp/rr_index_0.out")
+                    .unwrap()
+                    .read_to_end(&mut index0)
+                    .unwrap();
+                let mut index1 = Vec::new();
+                fs::File::open("/tmp/rr_index_1.out")
+                    .unwrap()
+                    .read_to_end(&mut index1)
+                    .unwrap();
+                assert_eq!(
+                    index0, index1,
+                    "Round robin shuffle index should be identical across runs"
+                );
+            }
+        }
+
+        // Clean up
+        let _ = fs::remove_file("/tmp/rr_data_0.out");
+        let _ = fs::remove_file("/tmp/rr_index_0.out");
+        let _ = fs::remove_file("/tmp/rr_data_1.out");
+        let _ = fs::remove_file("/tmp/rr_index_1.out");
+    }
 }
diff --git a/native/proto/src/proto/partitioning.proto 
b/native/proto/src/proto/partitioning.proto
index e11d7a384..e70b8264f 100644
--- a/native/proto/src/proto/partitioning.proto
+++ b/native/proto/src/proto/partitioning.proto
@@ -31,6 +31,7 @@ message Partitioning {
     HashPartition hash_partition = 1;
     SinglePartition single_partition = 2;
     RangePartition range_partition = 3;
+    RoundRobinPartition round_robin_partition = 4;
   }
 }
 
@@ -51,3 +52,9 @@ message RangePartition {
   int32 num_partitions = 2;
   repeated BoundaryRow boundary_rows = 4;
 }
+
+message RoundRobinPartition {
+  int32 num_partitions = 1;
+  // Maximum number of columns to hash. 0 means no limit (hash all columns).
+  int32 max_hash_columns = 2;
+}
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
index b5d15b41f..3fc222bd1 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.{IndexShuffleBlockResolver, 
ShuffleWriteMetricsReporter, ShuffleWriter}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
Literal}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
 import org.apache.spark.sql.comet.{CometExec, CometMetricNode}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -292,6 +292,16 @@ class CometNativeShuffleWriter[K, V](
           shuffleWriterBuilder.setPartitioning(
             partitioningBuilder.setRangePartition(partitioning).build())
 
+        case _: RoundRobinPartitioning =>
+          val partitioning = 
PartitioningOuterClass.RoundRobinPartition.newBuilder()
+          partitioning.setNumPartitions(outputPartitioning.numPartitions)
+          partitioning.setMaxHashColumns(
+            
CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS.get())
+
+          val partitioningBuilder = 
PartitioningOuterClass.Partitioning.newBuilder()
+          shuffleWriterBuilder.setPartitioning(
+            partitioningBuilder.setRoundRobinPartition(partitioning).build())
+
         case _ =>
           throw new UnsupportedOperationException(
             s"Partitioning $outputPartitioning is not supported.")
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 1805711d0..d65a6b21f 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -266,12 +266,31 @@ object CometShuffleExchangeExec
     def supportedHashPartitioningDataType(dt: DataType): Boolean = dt match {
       case _: BooleanType | _: ByteType | _: ShortType | _: IntegerType | _: 
LongType |
           _: FloatType | _: DoubleType | _: StringType | _: BinaryType | _: 
TimestampType |
-          _: TimestampNTZType | _: DecimalType | _: DateType =>
+          _: TimestampNTZType | _: DateType =>
+        true
+      case _: DecimalType =>
+        // TODO enforce this check
+        // https://github.com/apache/datafusion-comet/issues/3079
+        // Decimals with precision > 18 require Java BigDecimal conversion 
before hashing
+        // d.precision <= 18
         true
       case _ =>
         false
     }
 
+    /**
+     * Check if a data type contains a decimal with precision > 18. Such 
decimals require
+     * conversion to Java BigDecimal before hashing, which is not supported in 
native shuffle.
+     */
+    def containsHighPrecisionDecimal(dt: DataType): Boolean = dt match {
+      case d: DecimalType => d.precision > 18
+      case StructType(fields) => fields.exists(f => 
containsHighPrecisionDecimal(f.dataType))
+      case ArrayType(elementType, _) => 
containsHighPrecisionDecimal(elementType)
+      case MapType(keyType, valueType, _) =>
+        containsHighPrecisionDecimal(keyType) || 
containsHighPrecisionDecimal(valueType)
+      case _ => false
+    }
+
     /**
      * Determine which data types are supported as partition columns in native 
shuffle.
      *
@@ -384,6 +403,14 @@ object CometShuffleExchangeExec
           }
         }
         supported
+      case RoundRobinPartitioning(_) =>
+        val config = 
CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED
+        if (!config.get(conf)) {
+          withInfo(s, s"${config.key} is disabled")
+          return false
+        }
+        // RoundRobin partitioning uses position-based distribution matching 
Spark's behavior
+        true
       case _ =>
         withInfo(
           s,
@@ -395,7 +422,7 @@ object CometShuffleExchangeExec
   /**
    * Check if JVM-based columnar shuffle (CometColumnarExchange) can be used 
for this shuffle. JVM
    * shuffle is used when the child plan is not a Comet native operator, or 
when native shuffle
-   * doesn't support the required partitioning type (e.g., 
RoundRobinPartitioning).
+   * doesn't support the required partitioning type.
    */
   def columnarShuffleSupported(s: ShuffleExchangeExec): Boolean = {
 
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
index a682ff91a..1cf43ea59 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -388,4 +388,53 @@ class CometNativeShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper
       checkSparkAnswer(df)
     }
   }
+
+  test("native shuffle: round robin partitioning") {
+    withSQLConf(
+      CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key 
-> "true") {
+      withParquetTable((0 until 100).map(i => (i, (i + 1).toLong, s"str$i")), 
"tbl") {
+        val df = sql("SELECT * FROM tbl")
+
+        // Test basic round robin repartitioning
+        val shuffled = df.repartition(10)
+
+        // Just collect and verify row count - simpler test
+        val result = shuffled.collect()
+        assert(result.length == 100, s"Expected 100 rows, got 
${result.length}")
+      }
+    }
+  }
+
+  test("native shuffle: round robin deterministic behavior") {
+    // Test that round robin produces consistent results across multiple 
executions
+    withSQLConf(
+      CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key 
-> "true") {
+      withParquetTable((0 until 1000).map(i => (i, (i + 1).toLong, s"str$i")), 
"tbl") {
+        val df = sql("SELECT * FROM tbl")
+
+        // Execute shuffle twice and compare results
+        val result1 = df.repartition(8).collect().toSeq
+        val result2 = df.repartition(8).collect().toSeq
+
+        // Results should be identical (deterministic ordering)
+        assert(result1 == result2, "Round robin shuffle should produce 
deterministic results")
+      }
+    }
+  }
+
+  test("native shuffle: round robin with filter") {
+    withSQLConf(
+      CometConf.COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED.key 
-> "true") {
+      withParquetTable((0 until 100).map(i => (i, (i + 1).toLong)), "tbl") {
+        val df = sql("SELECT * FROM tbl")
+        val shuffled = df
+          .filter($"_1" < 50)
+          .repartition(10)
+
+        // Just collect and verify - simpler test
+        val result = shuffled.collect()
+        assert(result.length == 50, s"Expected 50 rows after filter, got 
${result.length}")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to