akalash commented on a change in pull request #15885:
URL: https://github.com/apache/flink/pull/15885#discussion_r633578223
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
##########
@@ -153,7 +153,12 @@ public BufferRecycler getRecycler() {
}
public void recycle() {
- recycler.recycle(memorySegment);
+ // If at least one consumer was created then they responsible for the
memory recycling
+ // because BufferBuilder doesn't contain a references counter so it
will be impossible to
+ // correctly recycle memory here.
+ if (!bufferConsumerCreated) {
+ recycler.recycle(memorySegment);
+ }
Review comment:
Yes, it is still definitely a hack. In my opinion, the right solution is
to avoid direct writing into `MemorySegment` from `BufferBuilder`. It means we
just should change the implementation of `BufferBuilder` in such a way that
using `Buffer` instead of `MemorySegment`. As I understand now you don't have
any objections about such a solution if the benchmark doesn't show any
degradation?
In any case, some answers to the questions:
- The contract is simple - `BufferBuilder#recycle()` should be called always
when `BufferBuilder` is not needed anymore. You don't need to think
`BufferConsumer` was created or not.
- In general, renaming `recycle()` to `close()` makes sense to me since
`BufferBuilder` doesn't have `retain` method and ideally, should be closed
after usage.(we can think about it when we will agree on a final solution)
- There are a couple of problems still not resolved - writing to already
released `memorySegment` or creating 'BufferConsumer' from already closed
'BufferBuilder'. They both can be resolved by solution which we already
discussed(using `Buffer` instead of `memorySegment` inside of `BufferBuilder`)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) {
}
public void onRecoveredStateBuffer(Buffer buffer) {
- boolean recycleBuffer = true;
NetworkActionsLogger.traceRecover(
"InputChannelRecoveredStateHandler#recover",
buffer,
inputGate.getOwningTaskName(),
channelInfo);
- try {
- final boolean wasEmpty;
- synchronized (receivedBuffers) {
- // Similar to notifyBufferAvailable(), make sure that we never
add a buffer
- // after releaseAllResources() released all buffers from
receivedBuffers.
- if (isReleased) {
- wasEmpty = false;
- } else {
- wasEmpty = receivedBuffers.isEmpty();
- receivedBuffers.add(buffer);
- recycleBuffer = false;
- }
- }
- if (wasEmpty) {
- notifyChannelNonEmpty();
- }
- } finally {
- if (recycleBuffer) {
- buffer.recycleBuffer();
+ final boolean wasEmpty;
+ synchronized (receivedBuffers) {
+ // Similar to notifyBufferAvailable(), make sure that we never add
a buffer
+ // after releaseAllResources() released all buffers from
receivedBuffers.
+ if (isReleased) {
+ wasEmpty = false;
+ } else {
+ wasEmpty = receivedBuffers.isEmpty();
+ receivedBuffers.add(buffer.retainBuffer());
Review comment:
Just in case, a little spoiler - these changes are not a target of the
current task so I can easily revert these changes.
Of course, it is not ok, that there are inconsistent between different parts
of the code. So I will change everything in the same manner when we agree with
the solution(or will do nothing if we decide to leave everything as is).
According to the original question. As I see right now there is no clear
pattern of usage retain/release. It is why I want to find something which will
be understandable for everybody. In my opinion, the classic pattern of
reference/resource counter looks like the following:
- if you request resources no matter what happens next you should release
this resource.
- If you share a resource you should increase the counter(ideally, you are
able to share only via a specific structure that does all work for you. ex.
BufferList instead of List<Buffer> which retain buffer automatically)
- If you passive consumer which receives resources from outside you should
do nothing with the management of resources.
```
void wrong() {
Buffer buf = getOrCreate();//request resource without release
...
}
void right() {
Buffer buf = getOrCreate();//request resource
try {
...
} finally {
buf.release();
}
}
```
```
void wrong(Buffer buf) {
buf.relase();//how I can be sure that it is enough of counter.
buf.retain();//retain without release, can be surprising for the caller of
this method.
}
void right(Buffer buf) {
buf.retain();//useless but possible
try {
...
} finally {
buf.release();
}
}
```
```
void wrong(Buffer buf) {
Buffer buf = getOrCreate();//request resource
list.add(buf);//too dangerous. can lead to leak after refactoring. but
possible in extreme cases for good performance.
}
void right(Buffer buf) {
Buffer buf = getOrCreate();//request resource
try {
list.add(buf.retain()); //it is better to have BufferList with automatic
'retain' but this is also possible.
} finally {
buf.release();
}
}
```
```
void right(Buffer buf) {
list.add(buf.retain()); //it is better to have BufferList with automatic
'retain' but this is also possible.
buf.retain(); //again, it is better to have something more specific for
sharing resources with other threads but this also works.
executor.submit(()-> /* some work with buffer*/);
}
```
So let's discuss what do you think about it. And maybe you can provide your
rules of using retain/release pattern which I didn't realize from the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]