This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c16a04e496 [GLUTEN-11037][VL] Corrected logic for updating uniffle
shuffle metrics (#11038)
c16a04e496 is described below
commit c16a04e4960cd2f12459dfb21f5412315d352c6a
Author: Zhen Wang <[email protected]>
AuthorDate: Sat Nov 8 04:34:09 2025 +0800
[GLUTEN-11037][VL] Corrected logic for updating uniffle shuffle metrics
(#11038)
---
.../writer/VeloxUniffleColumnarShuffleWriter.java | 33 +++++++++++++++-------
1 file changed, 23 insertions(+), 10 deletions(-)
diff --git
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 0951e0b809..da2cb20c9a 100644
---
a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -81,6 +81,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends
RssShuffleWriter<K,
private final int nativeBufferSize = GlutenConfig.get().maxBatchSize();
private final int bufferSize;
private final int numPartitions;
+ private final boolean isSort;
private final ColumnarShuffleDependency<K, V, V> columnarDep;
private final SparkConf sparkConf;
@@ -136,6 +137,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
this.codecBackend = codecBackend.get();
}
}
+ isSort =
columnarDep.shuffleWriterType().equals(SortShuffleWriterType$.MODULE$);
}
@Override
@@ -163,7 +165,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
bufferSize,
partitionPusher);
- if
(columnarDep.shuffleWriterType().equals(SortShuffleWriterType$.MODULE$)) {
+ if (isSort) {
nativeShuffleWriter =
shuffleWriterJniWrapper.createSortShuffleWriter(
numPartitions,
@@ -232,15 +234,26 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
throw new RssException(e);
}
columnarDep.metrics().get("shuffleWallTime").get().add(System.nanoTime() -
startTime);
- columnarDep
- .metrics()
- .get("splitTime")
- .get()
- .add(
- columnarDep.metrics().get("shuffleWallTime").get().value()
- - splitResult.getTotalPushTime()
- - splitResult.getTotalWriteTime()
- - splitResult.getTotalCompressTime());
+ if (!isSort) {
+ columnarDep
+ .metrics()
+ .get("splitTime")
+ .get()
+ .add(
+ columnarDep.metrics().get("shuffleWallTime").get().value()
+ - splitResult.getTotalPushTime()
+ - splitResult.getTotalWriteTime()
+ - splitResult.getTotalCompressTime());
+ columnarDep
+ .metrics()
+ .get("avgDictionaryFields")
+ .get()
+ .set(splitResult.getAvgDictionaryFields());
+
columnarDep.metrics().get("dictionarySize").get().add(splitResult.getDictionarySize());
+ } else {
+
columnarDep.metrics().get("sortTime").get().add(splitResult.getSortTime());
+ columnarDep.metrics().get("c2rTime").get().add(splitResult.getC2RTime());
+ }
// bytesWritten is calculated in uniffle side:
WriteBufferManager.createShuffleBlock
// shuffleWriteMetrics.incBytesWritten(splitResult.getTotalBytesWritten());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]