[ 
https://issues.apache.org/jira/browse/FLINK-22376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340830#comment-17340830
 ] 

Anton Kalashnikov commented on FLINK-22376:
-------------------------------------------

Yes, I understand that it is needed for caching of isFinished. But I just tried 
to say that one local flag isFinished would be enough. Right now, the methods 
CachedPositionMarker#update and CachedPositionMarker#getCached are called 
always one by one so it means we never read the cached value but we need it 
only for isFinished.

I was just confused when saw this code. In my head, I expected something like 
this:
  
{noformat}
boolean isFinished;
int writerPosition(){
   int position = positionMarker.get();
   isFinished = PositionMarker.isFinished(position);
   return position;
}
{noformat}
Just in case, I don't have a target to remove this code, and maybe saving a 
couple of code lines also doesn't make sense.
----
According to the original problem. of course, I don't like the idea that we 
just assume some guarantees(like BufferBuilder can not be accessed after 
closing all of the consumers). But if the direct writing to MemorySegment has 
better performance, I can propose pretty easy solution.
 BufferBuilder:
{noformat}
public void recycle() {
        if(!bufferConsumerCreated)
            recycler.recycle(memorySegment);
    }
{noformat}
Pattern of usage:
{noformat}
BufferBuilder builder = getOrCreateBuilder();
try{
} catch(Exception ex) {
 builder.recycle();
}
{noformat}
In this case, if BufferBuilder has the consumers already, then these consumers 
responsible for recycling but if BufferBuilder doesn't have the consumers yet 
and something bad happens, then BufferBuilder recycles the buffer by itself.

> SequentialChannelStateReaderImpl may recycle buffer twice
> ---------------------------------------------------------
>
>                 Key: FLINK-22376
>                 URL: https://issues.apache.org/jira/browse/FLINK-22376
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network, Runtime / Task
>    Affects Versions: 1.13.0
>            Reporter: Roman Khachatryan
>            Priority: Critical
>             Fix For: 1.14.0, 1.13.1
>
>
> In ChannelStateChunkReader.readChunk in case of error buffer is recycled in 
> the catch block. However, it might already have been recycled in 
> stateHandler.recover().
> Using minor priority, as this only affects already failing path.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to