TanYuxin-tyx commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1472434029


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortBuffer.java:
##########
@@ -184,6 +184,27 @@ public boolean append(ByteBuffer source, int 
targetSubpartition, Buffer.DataType
         return false;
     }
 
+    /**
+     * Try to release some unused memory segments.
+     *
+     * <p>Note that this class is not thread safe, so please make sure to call 
{@link
+     * #append(ByteBuffer source, int targetSubpartition, Buffer.DataType 
dataType)} and this method
+     * with lock acquired.
+     *
+     * @param numFreeSegments the number of segments to be released.
+     * @return true if released successfully, otherwise false.
+     */
+    public boolean returnFreeSegments(int numFreeSegments) {
+        if (numFreeSegments < numGuaranteedBuffers - segments.size()) {
+            for (int i = 0; i < numFreeSegments; i++) {
+                bufferRecycler.recycle(freeSegments.poll());

Review Comment:
   I have a concern here. Directly polling buffer and recycling it may not be 
right. 
   
   We should ensure that the left buffers are enough when reading buffers from 
the sort buffer. Only when the left buffers are enough for reading, we can 
recycle the buffers safely. (We can record the initial total number of the 
buffers and decide whether to recycle according to the left buffers.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to