This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b3855e286 [#2495] feat(server): Add an option to disable flush buffer
when caching data (#2497)
b3855e286 is described below
commit b3855e28661493ad374f228b44eb72a8c5ecabc0
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Jun 10 15:46:47 2025 +0800
[#2495] feat(server): Add an option to disable flush buffer when caching
data (#2497)
### What changes were proposed in this pull request?
Add an option to disable flush buffer when caching data.
### Why are the changes needed?
Fix: #2496
### Does this PR introduce _any_ user-facing change?
set `rss.server.buffer.flush.triggeredWhenCachingData` to false
and make sure `rss.server.shuffleBufferManager.trigger.flush.interval` is
bigger than 0
### How was this patch tested?
CI
---
.../java/org/apache/uniffle/server/ShuffleServerConf.java | 6 ++++++
.../java/org/apache/uniffle/server/ShuffleTaskManager.java | 5 ++++-
.../apache/uniffle/server/buffer/ShuffleBufferManager.java | 13 +++++++++++--
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 2c9dde21c..edf7d8684 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -414,6 +414,12 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription(
"For localstorage, it will exit when the failed initialized
local storage exceed the number");
+ public static final ConfigOption<Boolean>
BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA =
+ ConfigOptions.key("rss.server.buffer.flush.triggeredWhenCachingData")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether buffer flush will be triggered when
caching data");
+
public static final ConfigOption<Boolean> SINGLE_BUFFER_FLUSH_ENABLED =
ConfigOptions.key("rss.server.single.buffer.flush.enabled")
.booleanType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 08180baa0..08db55836 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -157,7 +157,10 @@ public class ShuffleTaskManager {
conf.getLong(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC);
this.leakShuffleDataCheckInterval =
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
+ boolean bufferFlushWhenCachingData =
+
conf.getBoolean(ShuffleServerConf.BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA);
this.triggerFlushInterval =
conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
+ assert bufferFlushWhenCachingData || triggerFlushInterval > 0;
// the thread for checking application status
this.scheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("checkResource");
@@ -1056,7 +1059,7 @@ public class ShuffleTaskManager {
}
private void triggerFlush() {
- synchronized (this.shuffleBufferManager) {
+ if (this.shuffleBufferManager.needToFlush()) {
this.shuffleBufferManager.flushIfNecessary();
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 4c29dd943..c4ed44d29 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -73,6 +73,7 @@ public class ShuffleBufferManager {
private long readCapacity;
private long highWaterMark;
private long lowWaterMark;
+ private final boolean bufferFlushWhenCachingData;
private boolean bufferFlushEnabled;
private long bufferFlushThreshold;
private long bufferFlushBlocksNumThreshold;
@@ -133,6 +134,8 @@ public class ShuffleBufferManager {
(capacity
/ 100.0
*
conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
+ this.bufferFlushWhenCachingData =
+
conf.getBoolean(ShuffleServerConf.BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA);
this.bufferFlushEnabled =
conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
this.bufferFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD);
@@ -292,6 +295,8 @@ public class ShuffleBufferManager {
spd.getPartitionId(),
entry.getKey().lowerEndpoint(),
entry.getKey().upperEndpoint());
+ }
+ if (bufferFlushWhenCachingData && needToFlush()) {
flushIfNecessary();
}
return StatusCode.SUCCESS;
@@ -374,9 +379,13 @@ public class ShuffleBufferManager {
}
}
- public void flushIfNecessary() {
+ public boolean needToFlush() {
// if data size in buffer > highWaterMark, do the flush
- if (usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() >
highWaterMark) {
+ return usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() >
highWaterMark;
+ }
+
+ public synchronized void flushIfNecessary() {
+ if (needToFlush()) {
// todo: add a metric here to track how many times flush occurs.
LOG.info(
"Start to flush with usedMemory[{}], preAllocatedSize[{}],
inFlushSize[{}]",