akalash commented on a change in pull request #15885:
URL: https://github.com/apache/flink/pull/15885#discussion_r633628445
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) {
}
public void onRecoveredStateBuffer(Buffer buffer) {
- boolean recycleBuffer = true;
NetworkActionsLogger.traceRecover(
"InputChannelRecoveredStateHandler#recover",
buffer,
inputGate.getOwningTaskName(),
channelInfo);
- try {
- final boolean wasEmpty;
- synchronized (receivedBuffers) {
- // Similar to notifyBufferAvailable(), make sure that we never
add a buffer
- // after releaseAllResources() released all buffers from
receivedBuffers.
- if (isReleased) {
- wasEmpty = false;
- } else {
- wasEmpty = receivedBuffers.isEmpty();
- receivedBuffers.add(buffer);
- recycleBuffer = false;
- }
- }
- if (wasEmpty) {
- notifyChannelNonEmpty();
- }
- } finally {
- if (recycleBuffer) {
- buffer.recycleBuffer();
+ final boolean wasEmpty;
+ synchronized (receivedBuffers) {
+ // Similar to notifyBufferAvailable(), make sure that we never add
a buffer
+ // after releaseAllResources() released all buffers from
receivedBuffers.
+ if (isReleased) {
+ wasEmpty = false;
+ } else {
+ wasEmpty = receivedBuffers.isEmpty();
+ receivedBuffers.add(buffer.retainBuffer());
Review comment:
Just in case, a little spoiler - these changes are not a target of the
current task so I can easily revert these changes.
Of course, it is not ok, that there are inconsistent between different parts
of the code. So I will change everything in the same manner when we agree with
the solution(or will do nothing if we decide to leave everything as is).
According to the original question. As I see right now there is no clear
pattern of usage retain/release. It is why I want to find something which will
be understandable for everybody. In my opinion, the classic pattern of
reference/resource counter looks like the following:
- if you request resources no matter what happens next you should release
this resource.
- If you share a resource you should increase the counter(ideally, you are
able to share only via a specific structure that does all work for you. ex.
BufferList instead of List<Buffer> which retain buffer automatically)
- If you passive consumer which receives resources from outside you should
do nothing with the management of resources.
```
void wrong() {
Buffer buf = getOrCreate();//request resource without release
...
}
void right() {
Buffer buf = getOrCreate();//request resource
try {
...
} finally {
buf.release();
}
}
```
```
void wrong(Buffer buf) {
buf.relase();//how I can be sure that it is enough of counter.
buf.retain();//retain without release, can be surprising for the caller of
this method.
}
void right(Buffer buf) {
buf.retain();//useless but possible
try {
...
} finally {
buf.release();
}
}
```
```
void wrong(Buffer buf) {
Buffer buf = getOrCreate();//request resource
list.add(buf);//too dangerous. can lead to leak after refactoring. but
possible in extreme cases for good performance.
}
void right(Buffer buf) {
Buffer buf = getOrCreate();//request resource
try {
list.add(buf.retain()); //it is better to have BufferList with automatic
'retain' but this is also possible.
} finally {
buf.release();
}
}
```
```
void right(Buffer buf) {
list.add(buf.retain()); //it is better to have BufferList with automatic
'retain' but this is also possible.
buf.retain(); //again, it is better to have something more specific for
sharing resources with other threads but this also works.
executor.submit(()-> /* some work with buffer*/);
}
```
So let's discuss what do you think about it. And maybe you can provide your
rules of using retain/release pattern which I didn't realize from the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]