This is an automated email from the ASF dual-hosted git repository.

taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 21618c9e4 [GLUTEN-6656][CELEBORN] Fix CelebornColumnarShuffleWriter 
assertion failed (#6657)
21618c9e4 is described below

commit 21618c9e40be6c7240ae17c737ef8eed2be1dcec
Author: exmy <[email protected]>
AuthorDate: Thu Aug 1 11:24:49 2024 +0800

    [GLUTEN-6656][CELEBORN] Fix CelebornColumnarShuffleWriter assertion failed 
(#6657)
---
 .../apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala |  7 ++++++-
 .../apache/spark/shuffle/CelebornColumnarShuffleWriter.scala   | 10 +++++++---
 .../spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala     |  7 ++++++-
 3 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index 5279426c1..ae22a0890 100644
--- 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -71,7 +71,12 @@ class CHCelebornColumnarShuffleWriter[K, V](
       }
     }
 
-    assert(nativeShuffleWriter != -1L)
+    // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter 
still equals -1
+    if (nativeShuffleWriter == -1L) {
+      handleEmptyIterator()
+      return
+    }
+
     splitResult = jniWrapper.stop(nativeShuffleWriter)
 
     dep.metrics("splitTime").add(splitResult.getSplitTime)
diff --git 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
index 8082934c2..f71fadd4c 100644
--- 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
@@ -111,9 +111,7 @@ abstract class CelebornColumnarShuffleWriter[K, V](
   @throws[IOException]
   final override def write(records: Iterator[Product2[K, V]]): Unit = {
     if (!records.hasNext) {
-      partitionLengths = new Array[Long](dep.partitioner.numPartitions)
-      client.mapperEnd(shuffleId, mapId, context.attemptNumber, numMappers)
-      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, 
mapId)
+      handleEmptyIterator()
       return
     }
     internalWrite(records)
@@ -161,4 +159,10 @@ abstract class CelebornColumnarShuffleWriter[K, V](
     client.mapperEnd(shuffleId, mapId, context.attemptNumber, numMappers)
     writeMetrics.incWriteTime(System.nanoTime - pushMergedDataTime)
   }
+
+  def handleEmptyIterator(): Unit = {
+    partitionLengths = new Array[Long](dep.partitioner.numPartitions)
+    client.mapperEnd(shuffleId, mapId, context.attemptNumber, numMappers)
+    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, 
mapId)
+  }
 }
diff --git 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index 8f613c728..c14d46a52 100644
--- 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -82,7 +82,12 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
       }
     }
 
-    assert(nativeShuffleWriter != -1L)
+    // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter 
still equals -1
+    if (nativeShuffleWriter == -1L) {
+      handleEmptyIterator()
+      return
+    }
+
     val startTime = System.nanoTime()
     splitResult = jniWrapper.stop(nativeShuffleWriter)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to