TanYuxin-tyx commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1435909606
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -244,6 +282,39 @@ public void release() {
ExceptionUtils.rethrow(e);
}
}
+ while (!bufferQueue.isEmpty()) {
+ MemorySegment segment = bufferQueue.poll();
+ bufferPool.recycle(segment);
+ numRequestedBuffers.decrementAndGet();
+ }
+ }
+
+ @VisibleForTesting
Review Comment:
Why do we really need the `VisibleForTesting`?
Could we test the behavior by the existing public/protected methods?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemorySpec.java:
##########
@@ -30,9 +30,21 @@ public class TieredStorageMemorySpec {
/** The number of guaranteed buffers of this memory owner. */
private final int numGuaranteedBuffers;
+ /**
+ * Whether the buffers of this owner can definitely be recycled, even if
the downstream does not
+ * consume them promptly.
+ */
+ private final boolean definitelyRecycled;
Review Comment:
The recent change introduces two control points for whether the tiers can
recycle buffers, leading to potential inconsistencies(here and
`bufferReclaimRequestListeners` in the memory manager). To address this, the
tier's buffer recycling ability can be checked via
`bufferReclaimRequestListeners` in the memory manager.
`bufferReclaimRequestListeners` could be refactored with two options:
1. Convert it to a Map<Object, Runnable> to associate owners with their
reclaim actions.
2. Have each tier implement a uniform recycling API, so
bufferReclaimRequestListeners can hold the tiers themselves.
Choosing either option will allow us to determine if a tier can recycle
buffers by checking if it's listed in bufferReclaimRequestListeners.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -244,6 +282,39 @@ public void release() {
ExceptionUtils.rethrow(e);
}
}
+ while (!bufferQueue.isEmpty()) {
+ MemorySegment segment = bufferQueue.poll();
+ bufferPool.recycle(segment);
+ numRequestedBuffers.decrementAndGet();
+ }
+ }
+
+ @VisibleForTesting
+ public int getBufferQueueLength() {
+ return bufferQueue.size();
+ }
+
+ private MemorySegment requestBufferBlocking(boolean fromPool) {
+ CompletableFuture<Void> requestBufferFuture = new
CompletableFuture<>();
+ scheduleCheckRequestBufferFuture(
+ requestBufferFuture,
INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);
+
+ MemorySegment memorySegment = null;
+ try {
+ if (fromPool) {
+ hardBackpressureTimerGauge.markStart();
+ memorySegment = bufferPool.requestMemorySegmentBlocking();
+ hardBackpressureTimerGauge.markEnd();
+ } else {
+ memorySegment = bufferQueue.take();
+ }
+ } catch (InterruptedException e) {
Review Comment:
When throwing an exception, we should also complete the future before
throwing it. Maybe this is an old bug, but I believe we can also fix it here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]