This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 2bda175b1 [CELEBORN-673][SPARK][PERF] Improve the perf of sort-based
shuffle write
2bda175b1 is described below
commit 2bda175b1761b6985ce155299e5b505ee342be94
Author: Fu Chen <[email protected]>
AuthorDate: Tue Jun 13 19:07:04 2023 +0800
[CELEBORN-673][SPARK][PERF] Improve the perf of sort-based shuffle write
### What changes were proposed in this pull request?
1. `SQLShuffleWriteMetricsReporter#incWriteTime` is a performance killer,
stop calling it once we insert a record
2. simplify the `incWriteTime` logic for handling large records, also
including the time required for memory copying
### Why are the changes needed?
frame graph and stage duration before:


frame graph and stage duration after:


### Does this PR introduce _any_ user-facing change?
No, only perf improvement
### How was this patch tested?
tested locally.
Closes #1585 from cfmcgrady/shuffle-metrics.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 86cbf7a3596303fea207e55e7ecb696d724d5409)
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 14 ++++++++------
.../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 18 +++++++++---------
2 files changed, 17 insertions(+), 15 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index d6abbe496..33292f3a7 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -200,6 +200,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
SQLMetric dataSize =
SparkUtils.getUnsafeRowSerializerDataSizeMetric((UnsafeRowSerializer)
dep.serializer());
+ long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
@@ -211,6 +212,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
dataSize.add(serializedRecordSize);
}
+ long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
byte[] giantBuffer = new byte[serializedRecordSize];
Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET,
Integer.reverseBytes(rowSize));
@@ -222,7 +224,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
rowSize);
pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
} else {
- long insertStartTime = System.nanoTime();
boolean success =
currentPusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
@@ -235,10 +236,11 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
throw new IOException("Unable to push after switching pusher!");
}
}
- writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
}
+ shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
+ writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushAndSwitch() throws IOException {
@@ -254,6 +256,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private void write0(scala.collection.Iterator iterator) throws IOException {
final scala.collection.Iterator<Product2<K, ?>> records = iterator;
+ long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<K, ?> record = records.next();
final K key = record._1();
@@ -266,10 +269,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
+ long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
- long insertStartTime = System.nanoTime();
boolean success =
currentPusher.insertRecord(
serBuffer.getBuf(),
@@ -290,15 +293,15 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
throw new IOException("Unable to push after switching pusher!");
}
}
- writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
}
+ shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
+ writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes)
throws IOException {
logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
- long pushStartTime = System.nanoTime();
int bytesWritten =
rssShuffleClient.pushData(
appId,
@@ -313,7 +316,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
numPartitions);
mapStatusLengths[partitionId].add(bytesWritten);
writeMetrics.incBytesWritten(bytesWritten);
- writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
}
private void close() throws IOException {
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 47d8eb41e..a62e2c21a 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -222,6 +222,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
final scala.collection.Iterator<Product2<Integer, UnsafeRow>> records =
iterator;
SQLMetric dataSize = SparkUtils.getDataSize((UnsafeRowSerializer)
dep.serializer());
+ long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<Integer, UnsafeRow> record = records.next();
final int partitionId = record._1();
@@ -234,6 +235,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
dataSize.add(serializedRecordSize);
}
+ long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
byte[] giantBuffer = new byte[serializedRecordSize];
Platform.putInt(giantBuffer, Platform.BYTE_ARRAY_OFFSET,
Integer.reverseBytes(rowSize));
@@ -245,7 +247,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
rowSize);
pushGiantRecord(partitionId, giantBuffer, serializedRecordSize);
} else {
- long insertStartTime = System.nanoTime();
boolean success =
currentPusher.insertRecord(
row.getBaseObject(), row.getBaseOffset(), rowSize,
partitionId, true);
@@ -258,10 +259,11 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
throw new CelebornIOException("Unable to push after switching
pusher!");
}
}
- writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
}
+ shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
+ writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushAndSwitch() throws IOException {
@@ -277,6 +279,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private void write0(scala.collection.Iterator iterator) throws IOException {
final scala.collection.Iterator<Product2<K, ?>> records = iterator;
+ long shuffleWriteTimeSum = 0L;
while (records.hasNext()) {
final Product2<K, ?> record = records.next();
final K key = record._1();
@@ -289,10 +292,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
+ long insertAndPushStartTime = System.nanoTime();
if (serializedRecordSize > pushBufferMaxSize) {
pushGiantRecord(partitionId, serBuffer.getBuf(), serializedRecordSize);
} else {
- long insertStartTime = System.nanoTime();
boolean success =
currentPusher.insertRecord(
serBuffer.getBuf(),
@@ -313,15 +316,15 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
throw new IOException("Unable to push after switching pusher!");
}
}
- writeMetrics.incWriteTime(System.nanoTime() - insertStartTime);
}
+ shuffleWriteTimeSum += System.nanoTime() - insertAndPushStartTime;
tmpRecords[partitionId] += 1;
}
+ writeMetrics.incWriteTime(shuffleWriteTimeSum);
}
private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes)
throws IOException {
logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
- long pushStartTime = System.nanoTime();
int bytesWritten =
rssShuffleClient.pushData(
appId,
@@ -336,7 +339,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
numPartitions);
mapStatusLengths[partitionId].add(bytesWritten);
writeMetrics.incBytesWritten(bytesWritten);
- writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
}
private void close() throws IOException {
@@ -357,11 +359,9 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
currentPusher.pushData();
currentPusher.close();
}
- writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
- long pushMergedDataTime = System.nanoTime();
rssShuffleClient.pushMergedData(appId, shuffleId, mapId,
taskContext.attemptNumber());
- writeMetrics.incWriteTime(System.nanoTime() - pushMergedDataTime);
+ writeMetrics.incWriteTime(System.nanoTime() - pushStartTime);
updateMapStatus();