[ 
https://issues.apache.org/jira/browse/FLINK-21181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17329201#comment-17329201
 ] 

Piotr Nowojski edited comment on FLINK-21181 at 4/23/21, 7:39 AM:
------------------------------------------------------------------

I think I've found a related issue. I've modified the code to not re-use the 
same memory segments, but on recycling always free up the segment. And what I 
have observed is a similar problem as reported in this ticket, but even more 
severe:

{noformat}
Caused by: java.lang.RuntimeException: segment has been freed
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
        at 
org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:477)
        at 
org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:468)
        at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
        at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1395)
        ... 11 more
Caused by: java.lang.IllegalStateException: segment has been freed
        at 
org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:483)
        at 
org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:1398)
        at 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:100)
        at 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.appendAndCommit(BufferBuilder.java:82)
        at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:250)
        at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
        at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 24 more
{noformat}
That's happening also during cancellation/job failover. It looks to me, that in 
the case you have analysed, processing time timer tried to request a buffer 
from destroyed buffer pool.  My case looks identical, but it's failing when 
trying to write to already `free`'ed up buffer. Without my changes, this code 
would silently write some data to a buffer that has already been 
recycled/returned to the pool. If someone else would pick up this buffer, it 
would easily lead to the data corruption.

As far as I can tell, the exact reason behind my exception is a bit different. 
The buffer to which timer attempts to write to, has been released from 
`ResultSubpartition#onConsumedSubpartition`, causing `BufferConsumer` to be 
closed (which recycles/frees underlying memory segment ), while matching 
`BufferBuilder` is still being used...


was (Author: pnowojski):
I think I've found a related issue. I've modified the code to not re-use the 
same memory segments, but on recycling always free up the segment. And what I 
have observed is a similar problem as reported in this ticket, but even more 
severe:

{noformat}
Caused by: java.lang.RuntimeException: segment has been freed
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
        at 
org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:477)
        at 
org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:468)
        at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
        at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1395)
        ... 11 more
Caused by: java.lang.IllegalStateException: segment has been freed
        at 
org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:483)
        at 
org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:1398)
        at 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:100)
        at 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.appendAndCommit(BufferBuilder.java:82)
        at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:250)
        at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
        at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 24 more
{noformat}
That's happening also during cancellation/job failover. It looks to me, that in 
the case you have analysed, processing time timer tried to request a buffer 
from destroyed buffer pool.  My case looks identical, but it's failing when 
trying to write to already `free`'ed up buffer. Without my changes, this code 
would silently write some data to a buffer that has already been 
recycled/returned to the pool. If someone else would pick up this buffer, it 
would easily lead to the data corruption.

As far as I can tell, the exact reason behind my exception is a bit different. 
The buffer to which timer attempts to write to, has been released from 
`ResultSubpartition#onConsumedSubpartition`, causing `BufferConsumer` to be 
closed (which recycles/frees underlying memory segment ), while matching 
`BufferBuilder` is still being used...

> Buffer pool is destroyed error when outputting data over a timer after 
> cancellation.
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-21181
>                 URL: https://issues.apache.org/jira/browse/FLINK-21181
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.11.0
>            Reporter: Arvid Heise
>            Priority: Major
>
> A [user 
> reported|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-causes-a-buffer-pool-exception-How-can-I-mitigate-it-td40959.html]
>  the issue and provided some taskmanager log with the following relevant 
> lines:
> {noformat}
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task       
>              [] - Attempting to cancel task forward fill -> (Sink: tag db 
> sink, Sink: back fill db sink, Sink: min max step db sink) (2/2) 
> (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task       
>              [] - forward fill -> (Sink: tag db sink, Sink: back fill db 
> sink, Sink: min max step db sink) (2/2) (8c1f256176fb89f112c27883350a02bc) 
> switched from RUNNING to CANCELING.
> 2021-01-26 04:37:43,280 INFO  org.apache.flink.runtime.taskmanager.Task       
>              [] - Triggering cancellation of task code forward fill -> (Sink: 
> tag db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2) 
> (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,282 ERROR 
> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction [] - 
> Error in timer.
> java.lang.RuntimeException: Buffer pool is destroyed.
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction.collect(ForwardFillKeyedProcessFunction.java:452)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction.onTimer(ForwardFillKeyedProcessFunction.java:277)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>  [flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
>  [flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
>  [flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  [flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>  [flink-dist_2.12-1.11.0.jar:1.11.0]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>  [flink-dist_2.12-1.11.0.jar:1.11.0]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [develop-17e9fd0e.jar:?]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [develop-17e9fd0e.jar:?]
>       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>  ~[develop-17e9fd0e.jar:?]
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>  ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>       ... 24 more
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to