[ 
https://issues.apache.org/jira/browse/FLINK-22424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330105#comment-17330105
 ] 

Piotr Nowojski commented on FLINK-22424:
----------------------------------------

Because writing to those released memory segments happens only while the job is 
already being cancelled/failing. For this data corruption to be visible, 
another task, from another job (or different failover region?) would had to be 
present and keep running. And also that 2nd task would need to immediately 
acquire this just released buffer. So impact is limited.

> Writing to already released buffers potentially causing data corruption
> -----------------------------------------------------------------------
>
>                 Key: FLINK-22424
>                 URL: https://issues.apache.org/jira/browse/FLINK-22424
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.6.4, 1.7.2, 1.8.3, 1.9.3, 1.10.3, 1.11.3, 1.12.2, 
> 1.13.0
>            Reporter: Piotr Nowojski
>            Priority: Critical
>
> By modifing the code to not re-use the same memory segments, but on recycling 
> always free up the segment. And what I have observed is a similar problem as 
> reported in FLINK-21181 ticket, but even more severe:
> {noformat}
> Caused by: java.lang.RuntimeException: segment has been freed
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:477)
>       at 
> org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:468)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1395)
>       ... 11 more
> Caused by: java.lang.IllegalStateException: segment has been freed
>       at 
> org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:483)
>       at 
> org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:1398)
>       at 
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:100)
>       at 
> org.apache.flink.runtime.io.network.buffer.BufferBuilder.appendAndCommit(BufferBuilder.java:82)
>       at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:250)
>       at 
> org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>       at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>       ... 24 more
> {noformat}
> That's happening also during cancellation/job failover. It's failing when 
> trying to write to already `free`'ed up buffer. Without my changes, this code 
> would silently write some data to a buffer that has already been 
> recycled/returned to the pool. If someone else would pick up this buffer, it 
> would easily lead to the data corruption.
> As far as I can tell, the exact reason behind this is that the buffer to 
> which timer attempts to write to, has been released from 
> `ResultSubpartition#onConsumedSubpartition`, causing `BufferConsumer` to be 
> closed (which recycles/frees underlying memory segment ), while matching 
> `BufferBuilder` is still being used...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to