This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit dea458d732c653ccb8b11a17ed009461271d9326 Author: Yuxin Tan <[email protected]> AuthorDate: Mon Sep 11 20:56:51 2023 +0800 [hotfix][network] Fix the bug of triggering disk writing in Hybrid Shuffle --- .../tiered/storage/TieredStorageMemoryManagerImpl.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java index f11a413c194..c526aafcf09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java @@ -61,6 +61,9 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage /** Time to wait for requesting new buffers before triggering buffer reclaiming. */ private static final int INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS = 50; + /** The maximum delay time before triggering buffer reclaiming. */ + private static final int MAX_DELAY_TIME_TO_TRIGGER_RECLAIM_BUFFER_MS = 1000; + /** The tiered storage memory specs of each memory user owner. */ private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs; @@ -168,7 +171,7 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage public BufferBuilder requestBufferBlocking(Object owner) { checkIsInitialized(); - reclaimBuffersIfNeeded(); + reclaimBuffersIfNeeded(0); CompletableFuture<Void> requestBufferFuture = new CompletableFuture<>(); scheduleCheckRequestBufferFuture( @@ -263,7 +266,7 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage if (requestBufferFuture.isDone()) { return; } - reclaimBuffersIfNeeded(); + reclaimBuffersIfNeeded(delayForNextCheckMs); scheduleCheckRequestBufferFuture(requestBufferFuture, delayForNextCheckMs); } @@ -279,13 +282,13 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage numRequestedBuffers.decrementAndGet(); } - private void reclaimBuffersIfNeeded() { - if (shouldReclaimBuffersBeforeRequesting()) { + private void reclaimBuffersIfNeeded(long delayForNextCheckMs) { + if (shouldReclaimBuffersBeforeRequesting(delayForNextCheckMs)) { bufferReclaimRequestListeners.forEach(Runnable::run); } } - private boolean shouldReclaimBuffersBeforeRequesting() { + private boolean shouldReclaimBuffersBeforeRequesting(long delayForNextCheckMs) { // The accuracy of the memory usage ratio may be compromised due to the varying buffer pool // sizes. However, this only impacts a single iteration of the buffer usage check. Upon the // next iteration, the buffer reclaim will eventually be triggered. @@ -294,7 +297,9 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage return numRequested >= numTotal // Because we do the checking before requesting buffers, we need add additional one // buffer when calculating the usage ratio. - || ((numRequested + 1) * 1.0 / numTotal) > numTriggerReclaimBuffersRatio; + || ((numRequested + 1) * 1.0 / numTotal) > numTriggerReclaimBuffersRatio + || delayForNextCheckMs > MAX_DELAY_TIME_TO_TRIGGER_RECLAIM_BUFFER_MS + && bufferPool.getNumberOfAvailableMemorySegments() == 0; } /** Note that this method may be called by the netty thread. */
