[
https://issues.apache.org/jira/browse/FLINK-21181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414846#comment-17414846
]
Piotr Nowojski commented on FLINK-21181:
----------------------------------------
I have to basically revert this change and reimplement the fix differently as a
result of FLINK-24182. Because on the clean cancellation path we don't want to
issue interrupts, we have to close buffer pools to unblock back pressured
tasks/threads/mailbox actions. To do so, as part of FLINK-24182 I will
re-implement fix for this (FLINK-21181) bug, so that access to closed/destroyed
LocalBufferPool will result in {{CancelTaskException}} being thrown.
> Buffer pool is destroyed error when outputting data over a timer after
> cancellation.
> ------------------------------------------------------------------------------------
>
> Key: FLINK-21181
> URL: https://issues.apache.org/jira/browse/FLINK-21181
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.11.0, 1.13.0, 1.14.0, 1.12.3
> Reporter: Arvid Heise
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0, 1.13.1, 1.12.4
>
>
> A [user
> reported|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-causes-a-buffer-pool-exception-How-can-I-mitigate-it-td40959.html]
> the issue and provided some taskmanager log with the following relevant
> lines:
> {noformat}
> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Attempting to cancel task forward fill -> (Sink: tag db
> sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
> (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task
> [] - forward fill -> (Sink: tag db sink, Sink: back fill db
> sink, Sink: min max step db sink) (2/2) (8c1f256176fb89f112c27883350a02bc)
> switched from RUNNING to CANCELING.
> 2021-01-26 04:37:43,280 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Triggering cancellation of task code forward fill -> (Sink:
> tag db sink, Sink: back fill db sink, Sink: min max step db sink) (2/2)
> (8c1f256176fb89f112c27883350a02bc).
> 2021-01-26 04:37:43,282 ERROR
> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction [] -
> Error in timer.
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction.collect(ForwardFillKeyedProcessFunction.java:452)
> ~[develop-17e9fd0e.jar:?]
> at
> xxxxxx.pipeline.stream.functions.process.ForwardFillKeyedProcessFunction.onTimer(ForwardFillKeyedProcessFunction.java:277)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onProcessingTime(KeyedProcessOperator.java:78)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [develop-17e9fd0e.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [develop-17e9fd0e.jar:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:290)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:213)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:294)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> ~[develop-17e9fd0e.jar:?]
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> ... 24 more
> {noformat}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)