This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 809fef6aa [GLUTEN-6656][UNIFFLE] VeloxUniffleColumnarShuffleWriter 
should send commit for all ColumnBatch with empty rows (#6698)
809fef6aa is described below

commit 809fef6aae21139d0ed690fed72662383ef52235
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 5 09:26:24 2024 +0800

    [GLUTEN-6656][UNIFFLE] VeloxUniffleColumnarShuffleWriter should send commit 
for all ColumnBatch with empty rows (#6698)
---
 .../writer/VeloxUniffleColumnarShuffleWriter.java   | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index a80e34fb1..d2032fa48 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -126,8 +126,8 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
 
   @Override
   protected void writeImpl(Iterator<Product2<K, V>> records) {
-    if (!records.hasNext() && !isMemoryShuffleEnabled) {
-      super.sendCommit();
+    if (!records.hasNext()) {
+      sendCommit();
       return;
     }
     // writer already init
@@ -189,11 +189,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
       }
     }
 
-    long startTime = System.nanoTime();
     LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
+    // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter 
still equals -1
     if (nativeShuffleWriter == -1L) {
-      throw new IllegalStateException("nativeShuffleWriter should not be -1L");
+      sendCommit();
+      return;
     }
+    long startTime = System.nanoTime();
     SplitResult splitResult;
     try {
       splitResult = jniWrapper.stop(nativeShuffleWriter);
@@ -219,9 +221,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
     long pushMergedDataTime = System.nanoTime();
     // clear all
     sendRestBlockAndWait();
-    if (!isMemoryShuffleEnabled) {
-      super.sendCommit();
-    }
+    sendCommit();
     long writeDurationMs = System.nanoTime() - pushMergedDataTime;
     shuffleWriteMetrics.incWriteTime(writeDurationMs);
     LOG.info(
@@ -229,6 +229,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
         TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
   }
 
+  @Override
+  protected void sendCommit() {
+    if (!isMemoryShuffleEnabled) {
+      super.sendCommit();
+    }
+  }
+
   @Override
   public Option<MapStatus> stop(boolean success) {
     if (!stopping) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to