[
https://issues.apache.org/jira/browse/FLINK-22946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17363478#comment-17363478
]
Guokuai Huang edited comment on FLINK-22946 at 6/15/21, 8:38 AM:
-----------------------------------------------------------------
Hi [~pnowojski], thanks for your reply.
1. For your concern about the output side. In credit-based flow control mode,
floating network buffers are shared among input channels. That is, in our case,
the problem of swapping floating network buffers between two input channels. On
the output side, there is no floating network buffer mechanism, so there will
be no such problem. Please correct me if I am wrong.
2. For the questions about the first solution. BufferManager::numRequireBuffers
is adjust dynamically based on backlog (real-time output buffers in the
subpartition) feedback. So when the following condition is true, it does not
mean that the manager's listener has been triggered.
{code:java}
if (getAvailableBufferSize() > numRequiredBuffers) {}
{code}
Because it may register the listener first, and then reduce the value of
numRequireBuffers based on backlog changes. At this moment, Channel A and
Channel B don’t really need more network buffer. They are calling the other’s
notifyBufferAvailable to quickly determine that the other side does not need a
buffer, and then remove the other’s listener. *To be precise, the first
solution is to remove its outdated listener when releasing floating buffer.*
Of course, I personally prefer the second solution, which is more general and
can avoid similar deadlocks.
was (Author: guokuai.huang):
Hi [~pnowojski], thanks for your reply.
1. For your concern about the output side. In credit-based flow control mode,
floating network buffers are shared among input channels. That is, in our case,
the problem of swapping floating network buffers between two input channels. On
the output side, there is no floating network buffer mechanism, so there will
be no such problem. Please correct me if I am wrong.
2. For the questions about the first solution. BufferManager::numRequireBuffers
is adjust dynamically based on backlog (real-time output buffers in the
subpartition) feedback. So when the following condition is true, it does not
mean that the manager's listener has been triggered.
if (getAvailableBufferSize() > numRequiredBuffers) {}
Because it may register the listener first, and then reduce the value of
numRequireBuffers based on backlog changes. At this moment, Channel A and
Channel B don’t really need more network buffer. They are calling the other’s
notifyBufferAvailable to quickly determine that the other side does not need a
buffer, and then remove the other’s listener. *To be precise, the first
solution is to remove its outdated listener when releasing floating buffer.*
Of course, I personally prefer the second solution, which is more general and
can avoid similar deadlocks.
> Network buffer deadlock introduced by unaligned checkpoint
> ----------------------------------------------------------
>
> Key: FLINK-22946
> URL: https://issues.apache.org/jira/browse/FLINK-22946
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.13.0, 1.13.1
> Reporter: Guokuai Huang
> Priority: Critical
> Labels: stale-blocker
> Fix For: 1.14.0, 1.13.2
>
> Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot
> 2021-06-09 at 7.02.04 PM.png
>
>
> We recently encountered deadlock when using unaligned checkpoint. Below are
> two thread stacks that cause deadlock:
> {code:java}
> "Channel state writer Join(xxxxxx) (34/256)#1": at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
> - waiting to lock <0x00000007296dfa90> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
> - locked <0x00000007296bc450> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
> Source) at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
> Source) at
> org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
> Source) at java.lang.Thread.run(Thread.java:745){code}
> {code:java}
> "Join(xxxxxx) (34/256)#1": at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
> - waiting to lock <0x00000007296bc450> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
> - locked <0x00000007296dfa90> (a
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
> Source) at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
> Source) at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at
> java.lang.Thread.run(Thread.java:745){code}
> The root cause of this problem is that unaligned checkpoint makes it possible
> that *under the same input gate,* *multiple input channels may recycle
> network buffer at the same time.*
> Previously, network buffer recycling would only occur serially between input
> channels under the same input gate, because each sub-task is process Input
> data serially, and an input gate belongs to only one sub-task. When unaligned
> checkpoint is enabled, each input channel will take a snapshot of the input
> channel when it receives the checkpoint barrier, and the network buffer may
> be recycled in the process.
> Unfortunately, *the current network buffer recycling mechanism does not take
> into account the situation where multiple input channels perform network
> buffer recycling at the same time.* The following code block is from
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue
> that causes deadlock when multiple input channels under same input gate
> perform network buffer recycling at the same time.
> !Screen Shot 2021-06-09 at 7.02.04 PM.png!
> The solution to this problem is quite straightforward. Here are two possible
> solutions:
> *1. Case by case solution.* Note that input channel A (locked A) gave the
> released network buffer to input channel B (waiting to lock B), and input
> channel B (locked B) gave the released network buffer to input channel A
> (waiting to lock A) ), so when an input channel releases the network buffer,
> first check whether it is also waiting for the network buffer, and if it is,
> directly allocate it to itself, which can avoid the situation that different
> input channels exchange network buffers.
> 2. *A straightforward solution.* Considering that the input channel occupies
> the lock during recycle to remove the network buffer from the bufferQueue,
> the subsequent operations do not need to hold this lock. Therefore, we only
> need to place Buffer::recycleBuffer outside the bufferQueue lock to avoid
> deadlock.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)