This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 0636e3ca4 [CELEBORN-654][SPARK] SortBasedShuffleWriter does not
require mapStatusRecords in Spark 3
0636e3ca4 is described below
commit 0636e3ca409beff9ed893a9b4e54f19d60b80435
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jun 9 09:43:08 2023 +0800
[CELEBORN-654][SPARK] SortBasedShuffleWriter does not require
mapStatusRecords in Spark 3
### What changes were proposed in this pull request?
`mapStatusRecords` is required in Spark 2 for constructing `MapStatus` when
AQE is enabled, but not in Spark 3, so remove it to save memory and compute
resources.
This PR also simplifies the `for loop` code.
### Why are the changes needed?
Remove unnecessary variables to save resources and clean up code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #1564 from pan3793/CELEBORN-654.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 14 ++++++--------
.../spark/shuffle/celeborn/SortBasedShuffleWriter.java | 17 ++++++-----------
2 files changed, 12 insertions(+), 19 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index edbfa529f..af3207e83 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -323,10 +323,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
long pushStartTime = System.nanoTime();
if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
- pushers[i].waitPushFinish();
- pushers[i].pushData();
- pushers[i].close();
+ for (SortBasedPusher pusher : pushers) {
+ pusher.waitPushFinish();
+ pusher.pushData();
+ pusher.close();
}
} else {
currentPusher.pushData();
@@ -344,13 +344,11 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void updateMapStatus() {
- long recordsWritten = 0;
- for (int i = 0; i < partitioner.numPartitions(); i++) {
+ for (int i = 0; i < tmpRecords.length; i++) {
mapStatusRecords[i] += tmpRecords[i];
- recordsWritten += tmpRecords[i];
+ writeMetrics.incRecordsWritten(tmpRecords[i]);
tmpRecords[i] = 0;
}
- writeMetrics.incRecordsWritten(recordsWritten);
}
@Override
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 070aa828f..ec92b0e46 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -83,7 +83,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private final SerializationStream serOutputStream;
private final LongAdder[] mapStatusLengths;
- private final long[] mapStatusRecords;
private final long[] tmpRecords;
/**
@@ -124,7 +123,6 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
serOutputStream = serializer.serializeStream(serBuffer);
this.mapStatusLengths = new LongAdder[numPartitions];
- this.mapStatusRecords = new long[numPartitions];
for (int i = 0; i < numPartitions; i++) {
this.mapStatusLengths[i] = new LongAdder();
}
@@ -331,10 +329,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
long pushStartTime = System.nanoTime();
if (pipelined) {
- for (int i = 0; i < pushers.length; i++) {
- pushers[i].waitPushFinish();
- pushers[i].pushData();
- pushers[i].close();
+ for (SortBasedPusher pusher : pushers) {
+ pusher.waitPushFinish();
+ pusher.pushData();
+ pusher.close();
}
} else {
currentPusher.pushData();
@@ -354,13 +352,10 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void updateMapStatus() {
- long recordsWritten = 0;
- for (int i = 0; i < partitioner.numPartitions(); i++) {
- mapStatusRecords[i] += tmpRecords[i];
- recordsWritten += tmpRecords[i];
+ for (int i = 0; i < tmpRecords.length; i++) {
+ writeMetrics.incRecordsWritten(tmpRecords[i]);
tmpRecords[i] = 0;
}
- writeMetrics.incRecordsWritten(recordsWritten);
}
@Override