[ 
https://issues.apache.org/jira/browse/FLINK-22424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330071#comment-17330071
 ] 

Piotr Nowojski commented on FLINK-22424:
----------------------------------------

As of now, I don't understand why is this issue popping up only from triggering 
processing timers. If that's the case, it would mean this issue is probably 
younger, and not affecting versions before 1.10.x or so.

Also I think the impact might be only for setups when there is more than one 
job running on the same cluster.

> Writing to already released buffers potentially causing data corruption
> -----------------------------------------------------------------------
>
>                 Key: FLINK-22424
>                 URL: https://issues.apache.org/jira/browse/FLINK-22424
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.6.4, 1.7.2, 1.8.3, 1.9.3, 1.10.3, 1.11.3, 1.12.2, 
> 1.13.0
>            Reporter: Piotr Nowojski
>            Priority: Critical
>
> By modifing the code to not re-use the same memory segments, but on recycling 
> always free up the segment. And what I have observed is a similar problem as 
> reported in FLINK-21181 ticket, but even more severe:
> {noformat}
> Caused by: java.lang.RuntimeException: segment has been freed
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:477)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:468)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1395)
>       ... 11 more
> Caused by: java.lang.IllegalStateException: segment has been freed
>       at 
> org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:483)
>       at 
> org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:1398)
>       at 
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:100)
>       at 
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.appendAndCommit(BufferBuilder.java:82)
>       at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:250)
>       at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>       at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>       ... 24 more
> {noformat}
> That's happening also during cancellation/job failover. It's failing when 
> trying to write to already `free`'ed up buffer. Without my changes, this code 
> would silently write some data to a buffer that has already been 
> recycled/returned to the pool. If someone else would pick up this buffer, it 
> would easily lead to the data corruption.
> As far as I can tell, the exact reason behind this is that the buffer to 
> which timer attempts to write to, has been released from 
> `ResultSubpartition#onConsumedSubpartition`, causing `BufferConsumer` to be 
> closed (which recycles/frees underlying memory segment ), while matching 
> `BufferBuilder` is still being used...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to