pnowojski commented on a change in pull request #15885:
URL: https://github.com/apache/flink/pull/15885#discussion_r633394247
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -120,38 +120,36 @@ public void checkpointStopped(long checkpointId) {
}
public void onRecoveredStateBuffer(Buffer buffer) {
- boolean recycleBuffer = true;
NetworkActionsLogger.traceRecover(
"InputChannelRecoveredStateHandler#recover",
buffer,
inputGate.getOwningTaskName(),
channelInfo);
- try {
- final boolean wasEmpty;
- synchronized (receivedBuffers) {
- // Similar to notifyBufferAvailable(), make sure that we never
add a buffer
- // after releaseAllResources() released all buffers from
receivedBuffers.
- if (isReleased) {
- wasEmpty = false;
- } else {
- wasEmpty = receivedBuffers.isEmpty();
- receivedBuffers.add(buffer);
- recycleBuffer = false;
- }
- }
- if (wasEmpty) {
- notifyChannelNonEmpty();
- }
- } finally {
- if (recycleBuffer) {
- buffer.recycleBuffer();
+ final boolean wasEmpty;
+ synchronized (receivedBuffers) {
+ // Similar to notifyBufferAvailable(), make sure that we never add
a buffer
+ // after releaseAllResources() released all buffers from
receivedBuffers.
+ if (isReleased) {
+ wasEmpty = false;
+ } else {
+ wasEmpty = receivedBuffers.isEmpty();
+ receivedBuffers.add(buffer.retainBuffer());
Review comment:
This makes this method inconsistent with `RemoteInputChannel#onBuffer`,
which makes things a bit more confusing. Also the old way seems more
natural/explicit to me, that `onBuffer()` call transfers the ownership of the
buffer to the `Remote/RecoveredInputChannel` and if a caller want's to re-use
this buffer else where, it should be the one doing the retaining.
Either way, I think this method should document in the java doc the contract
whether the passed `buffer` argument's ownership is taken by the this instance
or not.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
##########
@@ -153,7 +153,12 @@ public BufferRecycler getRecycler() {
}
public void recycle() {
- recycler.recycle(memorySegment);
+ // If at least one consumer was created then they responsible for the
memory recycling
+ // because BufferBuilder doesn't contain a references counter so it
will be impossible to
+ // correctly recycle memory here.
+ if (!bufferConsumerCreated) {
+ recycler.recycle(memorySegment);
+ }
Review comment:
Frankly, this seems (still?) like a partial solution/hack, because
what's the contract when this method should be called? Before this `recycle()`
method was introduced, it was at least clear, that `BufferBuilder` is never
recycling the segment, and this is always done via closing `BufferConsumer`s.
Now it seems like we are going deeper into the murky waters that "sometimes"
`recycle()` should be called?
I think I would like the "hack" of relaying on the `bufferConsumerCreated`
flag to avoid retaining the buffer in the `BufferBuilder`, but I think it's
still confusing:
1. It would be probably better to make `BufferBuilder` implement `Closeable`
and rename `recycle()` to `close()` - this would probably cause quite a bit of
changes, especially in tests.
2. It still doesn't solve the problem of writing to already released
`memorySegment`, like what if the `BufferConsumer` was created and has already
been closed, while someone is still writing data to the `BufferBuilder`? There
was a bug like that, that I fixed on a different layer, but maybe we should fix
this after all here as well? I mean especially measuring on the benchmarking
machine if there is any overhead of retaining and recycling `NetworkBuffer` one
extra time?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]