jiangxin369 commented on code in PR #23957:
URL: https://github.com/apache/flink/pull/23957#discussion_r1444373791
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java:
##########
@@ -129,6 +129,7 @@ public boolean tryWrite(
}
if (finishedBuffer.isBuffer()) {
memoryManager.transferBufferOwnership(bufferOwner, this,
finishedBuffer);
+ memoryManager.ensureCapacity();
Review Comment:
Yes, we should ensure the capacity before starting a new segment instead of
transferring ownership. I've change the implementation, please take a look.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java:
##########
@@ -91,6 +94,15 @@ public class TieredStorageMemoryManagerImpl implements
TieredStorageMemoryManage
*/
private final Map<Object, Integer> numOwnerRequestedBuffers;
+ /**
+ * The queue that contains all available buffers. This field should be
thread-safe because it
+ * can be touched both by the task thread and the netty thread.
+ */
+ private final BlockingQueue<MemorySegment> bufferQueue;
Review Comment:
The `BlockingQueue` implementations are required to be thread-safe according
to the Java API doc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]