[ 
https://issues.apache.org/jira/browse/FLINK-22376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340762#comment-17340762
 ] 

Anton Kalashnikov commented on FLINK-22376:
-------------------------------------------

In my opinion, the pattern of using the Buffer should be logically something 
like this:
{noformat}
Buffer buf = getOrCreate();
try {
 ....
 buf.retain();
 try {
 .....
 } finally {
 buf.recycle();
 }
} finally {
 buf.recycle();
}
{noformat}
or
{noformat}
Buffer buf = get();
try {
 ....
 list.add(buf.retain());
} finally {
 buf.recycle();
}

//otherThread/method
Buffer buf =list.get();
try {
 .....
 } finally {
 buf.recycle();
 }
{noformat}
In fact, in most cases, it indeed uses in such a way. But unfortunately when 
BufferBuilder is used this pattern is broken. For example:
{noformat}
BufferBuilder buff = createBufferBuilder();
try{
 BufferConsumer consumer = buff.createBufferConsumer();
 try{
 } finally {
 consumer.recycle();
 }
} finally {
 buff.recycle();//error - this buffer is already recycled when 
consumer.recycle()
}
{noformat}
and
{noformat}
BufferBuilder buff = createBufferBuilder();
try{
 list.add(buff.createBufferConsumer());
} finally {
 buff.recycle();
}

BufferConsumer consumer = list.get();
try{
 //error - it is impossible to use consumer because it is already recycled in 
buff.recycle();
 } finally {
 consumer.recycle();//error - this buffer is already recycled when 
buff.recycle()
 }
{noformat}
This happens because BufferBuilder writes directly to MemorySegment missing the 
Buffer. But the reference count is stored in the Buffer so it is impossible to 
recycle BufferBuilder correctly.

My proposal is to change the implementation of BufferBuilder in such a way that 
it writes in Buffer instead of MemorySegment and during the creation of the new 
consumer it is just 'retain' this buffer. So in this case the Buffer will be 
recycled only when all consumers and source BufferBuilder invoke the recycle.

P.S. I also don't see a lot of sense having the 
BufferConsumer#CachedPositionMarker. So I would want to delete it.

> SequentialChannelStateReaderImpl may recycle buffer twice
> ---------------------------------------------------------
>
>                 Key: FLINK-22376
>                 URL: https://issues.apache.org/jira/browse/FLINK-22376
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network, Runtime / Task
>    Affects Versions: 1.13.0
>            Reporter: Roman Khachatryan
>            Priority: Critical
>             Fix For: 1.14.0, 1.13.1
>
>
> In ChannelStateChunkReader.readChunk in case of error buffer is recycled in 
> the catch block. However, it might already have been recycled in 
> stateHandler.recover().
> Using minor priority, as this only affects already failing path.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to