[
https://issues.apache.org/jira/browse/FLINK-22946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski reassigned FLINK-22946:
--------------------------------------
Assignee: Guokuai Huang
> Network buffer deadlock introduced by unaligned checkpoint
> ----------------------------------------------------------
>
> Key: FLINK-22946
> URL: https://issues.apache.org/jira/browse/FLINK-22946
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.13.0, 1.13.1
> Reporter: Guokuai Huang
> Assignee: Guokuai Huang
> Priority: Critical
> Labels: stale-blocker
> Fix For: 1.14.0, 1.13.2
>
> Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot
> 2021-06-09 at 7.02.04 PM.png
>
>
> We recently encountered deadlock when using unaligned checkpoint. Below are
> two thread stacks that cause deadlock:
> {code:java}
> "Channel state writer Join(xxxxxx) (34/256)#1": at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
> - waiting to lock <0x00000007296dfa90> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
> - locked <0x00000007296bc450> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
> Source) at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
> Source) at
> org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
> Source) at java.lang.Thread.run(Thread.java:745){code}
> {code:java}
> "Join(xxxxxx) (34/256)#1": at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
> - waiting to lock <0x00000007296bc450> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
> - locked <0x00000007296dfa90> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
> Source) at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
> Source) at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at
> java.lang.Thread.run(Thread.java:745){code}
> The root cause of this problem is that unaligned checkpoint makes it possible
> that *under the same input gate,* *multiple input channels may recycle
> network buffer at the same time.*
> Previously, network buffer recycling would only occur serially between input
> channels under the same input gate, because each sub-task is process Input
> data serially, and an input gate belongs to only one sub-task. When unaligned
> checkpoint is enabled, each input channel will take a snapshot of the input
> channel when it receives the checkpoint barrier, and the network buffer may
> be recycled in the process.
> Unfortunately, *the current network buffer recycling mechanism does not take
> into account the situation where multiple input channels perform network
> buffer recycling at the same time.* The following code block is from
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue
> that causes deadlock when multiple input channels under same input gate
> perform network buffer recycling at the same time.
> !Screen Shot 2021-06-09 at 7.02.04 PM.png!
> The solution to this problem is quite straightforward. Here are two possible
> solutions:
> *1. Case by case solution.* Note that input channel A (locked A) gave the
> released network buffer to input channel B (waiting to lock B), and input
> channel B (locked B) gave the released network buffer to input channel A
> (waiting to lock A) ), so when an input channel releases the network buffer,
> first check whether it is also waiting for the network buffer, and if it is,
> directly allocate it to itself, which can avoid the situation that different
> input channels exchange network buffers.
> 2. *A straightforward solution.* Considering that the input channel occupies
> the lock during recycle to remove the network buffer from the bufferQueue,
> the subsequent operations do not need to hold this lock. Therefore, we only
> need to place Buffer::recycleBuffer outside the bufferQueue lock to avoid
> deadlock.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)