This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b3855e286 [#2495] feat(server): Add an option to disable flush buffer 
when caching data (#2497)
b3855e286 is described below

commit b3855e28661493ad374f228b44eb72a8c5ecabc0
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Jun 10 15:46:47 2025 +0800

    [#2495] feat(server): Add an option to disable flush buffer when caching 
data (#2497)
    
    ### What changes were proposed in this pull request?
    
    Add an option to disable flush buffer when caching data.
    
    ### Why are the changes needed?
    
    Fix: #2496
    
    ### Does this PR introduce _any_ user-facing change?
    
    set `rss.server.buffer.flush.triggeredWhenCachingData` to false
    and make sure `rss.server.shuffleBufferManager.trigger.flush.interval` is 
bigger than 0
    
    ### How was this patch tested?
    
    CI
---
 .../java/org/apache/uniffle/server/ShuffleServerConf.java   |  6 ++++++
 .../java/org/apache/uniffle/server/ShuffleTaskManager.java  |  5 ++++-
 .../apache/uniffle/server/buffer/ShuffleBufferManager.java  | 13 +++++++++++--
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 2c9dde21c..edf7d8684 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -414,6 +414,12 @@ public class ShuffleServerConf extends RssBaseConf {
           .withDescription(
               "For localstorage, it will exit when the failed initialized 
local storage exceed the number");
 
+  public static final ConfigOption<Boolean> 
BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA =
+      ConfigOptions.key("rss.server.buffer.flush.triggeredWhenCachingData")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription("Whether buffer flush will be triggered when 
caching data");
+
   public static final ConfigOption<Boolean> SINGLE_BUFFER_FLUSH_ENABLED =
       ConfigOptions.key("rss.server.single.buffer.flush.enabled")
           .booleanType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 08180baa0..08db55836 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -157,7 +157,10 @@ public class ShuffleTaskManager {
         
conf.getLong(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC);
     this.leakShuffleDataCheckInterval =
         
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
+    boolean bufferFlushWhenCachingData =
+        
conf.getBoolean(ShuffleServerConf.BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA);
     this.triggerFlushInterval = 
conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
+    assert bufferFlushWhenCachingData || triggerFlushInterval > 0;
     // the thread for checking application status
     this.scheduledExecutorService =
         ThreadUtils.getDaemonSingleThreadScheduledExecutor("checkResource");
@@ -1056,7 +1059,7 @@ public class ShuffleTaskManager {
   }
 
   private void triggerFlush() {
-    synchronized (this.shuffleBufferManager) {
+    if (this.shuffleBufferManager.needToFlush()) {
       this.shuffleBufferManager.flushIfNecessary();
     }
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 4c29dd943..c4ed44d29 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -73,6 +73,7 @@ public class ShuffleBufferManager {
   private long readCapacity;
   private long highWaterMark;
   private long lowWaterMark;
+  private final boolean bufferFlushWhenCachingData;
   private boolean bufferFlushEnabled;
   private long bufferFlushThreshold;
   private long bufferFlushBlocksNumThreshold;
@@ -133,6 +134,8 @@ public class ShuffleBufferManager {
             (capacity
                 / 100.0
                 * 
conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
+    this.bufferFlushWhenCachingData =
+        
conf.getBoolean(ShuffleServerConf.BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA);
     this.bufferFlushEnabled = 
conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
     this.bufferFlushThreshold =
         
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD);
@@ -292,6 +295,8 @@ public class ShuffleBufferManager {
           spd.getPartitionId(),
           entry.getKey().lowerEndpoint(),
           entry.getKey().upperEndpoint());
+    }
+    if (bufferFlushWhenCachingData && needToFlush()) {
       flushIfNecessary();
     }
     return StatusCode.SUCCESS;
@@ -374,9 +379,13 @@ public class ShuffleBufferManager {
     }
   }
 
-  public void flushIfNecessary() {
+  public boolean needToFlush() {
     // if data size in buffer > highWaterMark, do the flush
-    if (usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() > 
highWaterMark) {
+    return usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() > 
highWaterMark;
+  }
+
+  public synchronized void flushIfNecessary() {
+    if (needToFlush()) {
       // todo: add a metric here to track how many times flush occurs.
       LOG.info(
           "Start to flush with usedMemory[{}], preAllocatedSize[{}], 
inFlushSize[{}]",

Reply via email to