This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dea458d732c653ccb8b11a17ed009461271d9326
Author: Yuxin Tan <[email protected]>
AuthorDate: Mon Sep 11 20:56:51 2023 +0800

    [hotfix][network] Fix the bug of triggering disk writing in Hybrid Shuffle
---
 .../tiered/storage/TieredStorageMemoryManagerImpl.java  | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
index f11a413c194..c526aafcf09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
@@ -61,6 +61,9 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
     /** Time to wait for requesting new buffers before triggering buffer 
reclaiming. */
     private static final int INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS 
= 50;
 
+    /** The maximum delay time before triggering buffer reclaiming. */
+    private static final int MAX_DELAY_TIME_TO_TRIGGER_RECLAIM_BUFFER_MS = 
1000;
+
     /** The tiered storage memory specs of each memory user owner. */
     private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs;
 
@@ -168,7 +171,7 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
     public BufferBuilder requestBufferBlocking(Object owner) {
         checkIsInitialized();
 
-        reclaimBuffersIfNeeded();
+        reclaimBuffersIfNeeded(0);
 
         CompletableFuture<Void> requestBufferFuture = new 
CompletableFuture<>();
         scheduleCheckRequestBufferFuture(
@@ -263,7 +266,7 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
         if (requestBufferFuture.isDone()) {
             return;
         }
-        reclaimBuffersIfNeeded();
+        reclaimBuffersIfNeeded(delayForNextCheckMs);
         scheduleCheckRequestBufferFuture(requestBufferFuture, 
delayForNextCheckMs);
     }
 
@@ -279,13 +282,13 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
         numRequestedBuffers.decrementAndGet();
     }
 
-    private void reclaimBuffersIfNeeded() {
-        if (shouldReclaimBuffersBeforeRequesting()) {
+    private void reclaimBuffersIfNeeded(long delayForNextCheckMs) {
+        if (shouldReclaimBuffersBeforeRequesting(delayForNextCheckMs)) {
             bufferReclaimRequestListeners.forEach(Runnable::run);
         }
     }
 
-    private boolean shouldReclaimBuffersBeforeRequesting() {
+    private boolean shouldReclaimBuffersBeforeRequesting(long 
delayForNextCheckMs) {
         // The accuracy of the memory usage ratio may be compromised due to 
the varying buffer pool
         // sizes. However, this only impacts a single iteration of the buffer 
usage check. Upon the
         // next iteration, the buffer reclaim will eventually be triggered.
@@ -294,7 +297,9 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
         return numRequested >= numTotal
                 // Because we do the checking before requesting buffers, we 
need add additional one
                 // buffer when calculating the usage ratio.
-                || ((numRequested + 1) * 1.0 / numTotal) > 
numTriggerReclaimBuffersRatio;
+                || ((numRequested + 1) * 1.0 / numTotal) > 
numTriggerReclaimBuffersRatio
+                || delayForNextCheckMs > 
MAX_DELAY_TIME_TO_TRIGGER_RECLAIM_BUFFER_MS
+                        && bufferPool.getNumberOfAvailableMemorySegments() == 
0;
     }
 
     /** Note that this method may be called by the netty thread. */

Reply via email to