This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ce4a8dd fix(client): disable spark memory spill (#844)
4ce4a8dd is described below

commit 4ce4a8ddfa7edccee033e73ca16face959a34c1b
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Apr 26 22:32:14 2023 +0800

    fix(client): disable spark memory spill (#844)
    
    ### What changes were proposed in this pull request?
    
    Disable the memory spill operation.
    
    ### Why are the changes needed?
    
    In #714 , the memory spill is introduced to solve the dead lock. For a pity,
    these part code should be handled carefully, including concurrency and data
    consistency, like the fix PR #811 . And this part has bugs and I will fix 
these in
    the next days.
    
    Currently, I want to revert the PR #714. But the partial refactor of #714 
is still meaningful. So
    I submit this PR to disable the memory spill.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Don't need
---
 .../spark/shuffle/writer/WriteBufferManager.java   | 25 +---------------------
 .../shuffle/writer/WriteBufferManagerTest.java     |  1 -
 2 files changed, 1 insertion(+), 25 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 41687636..bbabeb50 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -22,11 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import scala.reflect.ClassTag$;
 import scala.reflect.ManifestFactory$;
@@ -324,27 +321,7 @@ public class WriteBufferManager extends MemoryConsumer {
 
   @Override
   public long spill(long size, MemoryConsumer trigger) {
-    List<AddBlockEvent> events = buildBlockEvents(clear());
-    List<CompletableFuture<Long>> futures = events.stream().map(x -> 
spillFunc.apply(x)).collect(Collectors.toList());
-    CompletableFuture<Void> allOfFutures =
-        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()]));
-    try {
-      allOfFutures.get(memorySpillTimeoutSec, TimeUnit.SECONDS);
-    } catch (TimeoutException timeoutException) {
-      // A best effort strategy to wait.
-      // If timeout exception occurs, the underlying tasks won't be cancelled.
-    } finally {
-      long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x 
-> {
-        try {
-          return x.get();
-        } catch (Exception e) {
-          return 0;
-        }
-      }).sum();
-      LOG.info("[taskId: {}] Spill triggered by memory consumer of {}, 
released memory size: {}",
-          taskId, trigger.getClass().getSimpleName(), releasedSize);
-      return releasedSize;
-    }
+    return 0L;
   }
 
   @VisibleForTesting
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
index 87580541..5e5e6dfb 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
@@ -221,7 +221,6 @@ public class WriteBufferManagerTest {
     assertEquals(3, events.size());
   }
 
-  @Test
   public void spillTest() {
     SparkConf conf = getConf();
     conf.set("spark.rss.client.send.size.limit", "1000");

Reply via email to