This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4ce4a8dd fix(client): disable spark memory spill (#844)
4ce4a8dd is described below
commit 4ce4a8ddfa7edccee033e73ca16face959a34c1b
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Apr 26 22:32:14 2023 +0800
fix(client): disable spark memory spill (#844)
### What changes were proposed in this pull request?
Disable the memory spill operation.
### Why are the changes needed?
In #714 , the memory spill is introduced to solve the dead lock. For a pity,
these part code should be handled carefully, including concurrency and data
consistency, like the fix PR #811 . And this part has bugs and I will fix
these in
the next days.
Currently, I want to revert the PR #714. But the partial refactor of #714
is still meaningful. So
I submit this PR to disable the memory spill.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Don't need
---
.../spark/shuffle/writer/WriteBufferManager.java | 25 +---------------------
.../shuffle/writer/WriteBufferManagerTest.java | 1 -
2 files changed, 1 insertion(+), 25 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 41687636..bbabeb50 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -22,11 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
-import java.util.stream.Collectors;
import scala.reflect.ClassTag$;
import scala.reflect.ManifestFactory$;
@@ -324,27 +321,7 @@ public class WriteBufferManager extends MemoryConsumer {
@Override
public long spill(long size, MemoryConsumer trigger) {
- List<AddBlockEvent> events = buildBlockEvents(clear());
- List<CompletableFuture<Long>> futures = events.stream().map(x ->
spillFunc.apply(x)).collect(Collectors.toList());
- CompletableFuture<Void> allOfFutures =
- CompletableFuture.allOf(futures.toArray(new
CompletableFuture[futures.size()]));
- try {
- allOfFutures.get(memorySpillTimeoutSec, TimeUnit.SECONDS);
- } catch (TimeoutException timeoutException) {
- // A best effort strategy to wait.
- // If timeout exception occurs, the underlying tasks won't be cancelled.
- } finally {
- long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x
-> {
- try {
- return x.get();
- } catch (Exception e) {
- return 0;
- }
- }).sum();
- LOG.info("[taskId: {}] Spill triggered by memory consumer of {},
released memory size: {}",
- taskId, trigger.getClass().getSimpleName(), releasedSize);
- return releasedSize;
- }
+ return 0L;
}
@VisibleForTesting
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
index 87580541..5e5e6dfb 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
@@ -221,7 +221,6 @@ public class WriteBufferManagerTest {
assertEquals(3, events.size());
}
- @Test
public void spillTest() {
SparkConf conf = getConf();
conf.set("spark.rss.client.send.size.limit", "1000");