This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 809fef6aa [GLUTEN-6656][UNIFFLE] VeloxUniffleColumnarShuffleWriter
should send commit for all ColumnBatch with empty rows (#6698)
809fef6aa is described below
commit 809fef6aae21139d0ed690fed72662383ef52235
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Aug 5 09:26:24 2024 +0800
[GLUTEN-6656][UNIFFLE] VeloxUniffleColumnarShuffleWriter should send commit
for all ColumnBatch with empty rows (#6698)
---
.../writer/VeloxUniffleColumnarShuffleWriter.java | 21 ++++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index a80e34fb1..d2032fa48 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -126,8 +126,8 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
@Override
protected void writeImpl(Iterator<Product2<K, V>> records) {
- if (!records.hasNext() && !isMemoryShuffleEnabled) {
- super.sendCommit();
+ if (!records.hasNext()) {
+ sendCommit();
return;
}
// writer already init
@@ -189,11 +189,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
}
}
- long startTime = System.nanoTime();
LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
+ // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter
still equals -1
if (nativeShuffleWriter == -1L) {
- throw new IllegalStateException("nativeShuffleWriter should not be -1L");
+ sendCommit();
+ return;
}
+ long startTime = System.nanoTime();
SplitResult splitResult;
try {
splitResult = jniWrapper.stop(nativeShuffleWriter);
@@ -219,9 +221,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
long pushMergedDataTime = System.nanoTime();
// clear all
sendRestBlockAndWait();
- if (!isMemoryShuffleEnabled) {
- super.sendCommit();
- }
+ sendCommit();
long writeDurationMs = System.nanoTime() - pushMergedDataTime;
shuffleWriteMetrics.incWriteTime(writeDurationMs);
LOG.info(
@@ -229,6 +229,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
}
+ @Override
+ protected void sendCommit() {
+ if (!isMemoryShuffleEnabled) {
+ super.sendCommit();
+ }
+ }
+
@Override
public Option<MapStatus> stop(boolean success) {
if (!stopping) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]