This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 164e0d024 [#1177] improvement: Reduce the write time of tasks (#1179)
164e0d024 is described below
commit 164e0d02456763f661fb55cc47459312bf4e80b4
Author: roryqi <[email protected]>
AuthorDate: Thu Aug 31 10:05:12 2023 +0800
[#1177] improvement: Reduce the write time of tasks (#1179)
### What changes were proposed in this pull request?
Use callback and queue to optimize the write time.
### Why are the changes needed?
Fix: #1177
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI passed.
---
.../spark/shuffle/writer/RssShuffleWriter.java | 51 +++++++++++++++++-----
1 file changed, 41 insertions(+), 10 deletions(-)
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index fb0c78502..330f56c8d 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -24,10 +24,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -93,6 +95,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
protected final ShuffleWriteMetrics shuffleWriteMetrics;
+ private final BlockingQueue<Object> finishEventQueue = new
LinkedBlockingQueue<>();
+
// Only for tests
@VisibleForTesting
public RssShuffleWriter(
@@ -293,6 +297,13 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
List<ShuffleBlockInfo> shuffleBlockInfoList) {
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (AddBlockEvent event :
bufferManager.buildBlockEvents(shuffleBlockInfoList)) {
+ event.addCallback(
+ () -> {
+ boolean ret = finishEventQueue.add(new Object());
+ if (!ret) {
+ LOG.error("Add event " + event + " to finishEventQueue fail");
+ }
+ });
futures.add(shuffleManager.sendData(event));
}
return futures;
@@ -300,17 +311,33 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
@VisibleForTesting
protected void checkBlockSendResult(Set<Long> blockIds) {
- long start = System.currentTimeMillis();
- while (true) {
- checkIfBlocksFailed();
- Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
- blockIds.removeAll(successBlockIds);
- if (blockIds.isEmpty()) {
- break;
+ boolean interrupted = false;
+
+ try {
+ long remainingMs = sendCheckTimeout;
+ long end = System.currentTimeMillis() + remainingMs;
+
+ while (true) {
+ try {
+ finishEventQueue.clear();
+ checkIfBlocksFailed();
+ Set<Long> successBlockIds =
shuffleManager.getSuccessBlockIds(taskId);
+ blockIds.removeAll(successBlockIds);
+ if (blockIds.isEmpty()) {
+ break;
+ }
+ if (finishEventQueue.isEmpty()) {
+ remainingMs = Math.max(end - System.currentTimeMillis(), 0);
+ Object event = finishEventQueue.poll(remainingMs,
TimeUnit.MILLISECONDS);
+ if (event == null) {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
}
- LOG.info("Wait " + blockIds.size() + " blocks sent to shuffle server");
- Uninterruptibles.sleepUninterruptibly(sendCheckInterval,
TimeUnit.MILLISECONDS);
- if (System.currentTimeMillis() - start > sendCheckTimeout) {
+ if (!blockIds.isEmpty()) {
String errorMsg =
"Timeout: Task["
+ taskId
@@ -322,6 +349,10 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
LOG.error(errorMsg);
throw new RssException(errorMsg);
}
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}