TanYuxin-tyx commented on code in PR #23851:
URL: https://github.com/apache/flink/pull/23851#discussion_r1472434029
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortBuffer.java:
##########
@@ -184,6 +184,27 @@ public boolean append(ByteBuffer source, int
targetSubpartition, Buffer.DataType
return false;
}
+ /**
+ * Try to release some unused memory segments.
+ *
+ * <p>Note that this class is not thread safe, so please make sure to call
{@link
+ * #append(ByteBuffer source, int targetSubpartition, Buffer.DataType
dataType)} and this method
+ * with lock acquired.
+ *
+ * @param numFreeSegments the number of segments to be released.
+ * @return true if released successfully, otherwise false.
+ */
+ public boolean returnFreeSegments(int numFreeSegments) {
+ if (numFreeSegments < numGuaranteedBuffers - segments.size()) {
+ for (int i = 0; i < numFreeSegments; i++) {
+ bufferRecycler.recycle(freeSegments.poll());
Review Comment:
I have a concern here. Directly polling buffer and recycling it may not be
right.
We should ensure that the left buffers are enough when reading buffers from
the sort buffer. Only when the left buffers are enough for reading, we can
recycle the buffers safely. (We can record the initial total number of the
buffers and decide whether to recycle according to the left buffers.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]