[
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881129#comment-16881129
]
Yingjie Cao commented on FLINK-13100:
-------------------------------------
The deadlock problem of spillable subpartition has been reported in this jira:
https://issues.apache.org/jira/browse/FLINK-12329.
The following are some exception stacks which offer more information:
2019-07-09 12:35:54
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Fatal error at remote task manager
'hadoop5035.et2.tbsite.net/11.180.36.89:32383'.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:269)
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE
data: Requesting new buffer before previous buffer returned.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:267)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelWritabilityChanged(PartitionRequestQueue.java:204)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelWritabilityChanged(ChannelInboundHandlerAdapter.java:119)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelWritabilityChanged(ChannelInboundHandlerAdapter.java:119)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelWritabilityChanged(DefaultChannelPipeline.java:1457)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelWritabilityChanged(DefaultChannelPipeline.java:977)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.fireChannelWritabilityChanged(ChannelOutboundBuffer.java:614)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.setWritable(ChannelOutboundBuffer.java:580)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.decrementPendingOutboundBytes(ChannelOutboundBuffer.java:194)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:259)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:938)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:54)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
... 2 more
Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE
data: Requesting new buffer before previous buffer returned.
at
org.apache.flink.runtime.io.network.partition.FileChannelBoundedData$FileBufferReader.nextBuffer(FileChannelBoundedData.java:146)
at
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.getNextBuffer(BoundedBlockingSubpartitionReader.java:87)
at
org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:160)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:227)
... 30 more
2019-07-09 11:47:05
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Fatal error at remote task manager
'hadoop5033.et2.tbsite.net/11.180.36.87:52978'.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:269)
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE
data: Requesting new buffer before previous buffer returned.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:267)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.access$100(PartitionRequestQueue.java:53)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue$WriteAndFlushNextMessageIfPossibleListener.operationComplete(PartitionRequestQueue.java:335)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue$WriteAndFlushNextMessageIfPossibleListener.operationComplete(PartitionRequestQueue.java:329)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:938)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:905)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1396)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:837)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1071)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:304)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:257)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:111)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:176)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:90)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
... 2 more
Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE
data: Requesting new buffer before previous buffer returned.
at
org.apache.flink.runtime.io.network.partition.FileChannelBoundedData$FileBufferReader.nextBuffer(FileChannelBoundedData.java:146)
at
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.getNextBuffer(BoundedBlockingSubpartitionReader.java:87)
at
org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:160)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:227)
... 52 more
> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -----------------------------------------------------------------
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Network
> Reporter: zhijiang
> Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next
> memory segment always available based on the assumption that the nextBuffer
> call could only happen when the previous buffer was recycled before.
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based
> and zero-copy features in network. The detail processes are as follows:
> * The netty thread finishes calling the channel.writeAndFlush() in
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later.
> Before future done, the corresponding buffer is not recycled because of
> zero-copy improvement.
> * Before the previous future done, the netty thread could trigger next
> writeAndFlush via processing addCredit message, then
> FileBufferReader#nextBuffer would throw exception because of previous buffer
> not recycled.
> We thought of several ways for solving this potential bug:
> * It does not trigger the next writeAndFlush before the previous future
> done. To do so it has to maintain the future state and check it in relevant
> actions. I wonder it might bring performance regression in network throughput
> and bring extra state management.
> * Adjust the implementation of current FileBufferReader. We ever regarded
> the blocking partition view as always available based on the next buffer read
> ahead, so it would be always added into available queue in
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view
> availability could be judged based on available buffers in FileBufferReader
> instead of next buffer ahead. When the buffer is recycled into
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)