[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881129#comment-16881129 ]
Yingjie Cao commented on FLINK-13100: ------------------------------------- The deadlock problem of spillable subpartition has been reported in this jira: https://issues.apache.org/jira/browse/FLINK-12329. The following are some exception stacks which offer more information: 2019-07-09 12:35:54 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'hadoop5035.et2.tbsite.net/11.180.36.89:32383'. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:269) at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) at java.lang.Thread.run(Thread.java:834) Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE data: Requesting new buffer before previous buffer returned. at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:267) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelWritabilityChanged(PartitionRequestQueue.java:204) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelWritabilityChanged(ChannelInboundHandlerAdapter.java:119) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelWritabilityChanged(ChannelInboundHandlerAdapter.java:119) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelWritabilityChanged(DefaultChannelPipeline.java:1457) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelWritabilityChanged(DefaultChannelPipeline.java:977) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.fireChannelWritabilityChanged(ChannelOutboundBuffer.java:614) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.setWritable(ChannelOutboundBuffer.java:580) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.decrementPendingOutboundBytes(ChannelOutboundBuffer.java:194) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:259) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:938) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:54) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474) ... 2 more Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE data: Requesting new buffer before previous buffer returned. at org.apache.flink.runtime.io.network.partition.FileChannelBoundedData$FileBufferReader.nextBuffer(FileChannelBoundedData.java:146) at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.getNextBuffer(BoundedBlockingSubpartitionReader.java:87) at org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:160) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:227) ... 30 more 2019-07-09 11:47:05 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'hadoop5033.et2.tbsite.net/11.180.36.87:52978'. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:269) at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) at java.lang.Thread.run(Thread.java:834) Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE data: Requesting new buffer before previous buffer returned. at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:267) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.access$100(PartitionRequestQueue.java:53) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue$WriteAndFlushNextMessageIfPossibleListener.operationComplete(PartitionRequestQueue.java:335) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue$WriteAndFlushNextMessageIfPossibleListener.operationComplete(PartitionRequestQueue.java:329) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) at org.apache.flink.shaded.netty4.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:938) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:905) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1396) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:837) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1071) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:304) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:257) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:111) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:176) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:90) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474) ... 2 more Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE data: Requesting new buffer before previous buffer returned. at org.apache.flink.runtime.io.network.partition.FileChannelBoundedData$FileBufferReader.nextBuffer(FileChannelBoundedData.java:146) at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.getNextBuffer(BoundedBlockingSubpartitionReader.java:87) at org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.getNextBuffer(CreditBasedSequenceNumberingViewReader.java:160) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:227) ... 52 more > Fix the unexpected IOException during FileBufferReader#nextBuffer > ----------------------------------------------------------------- > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network > Reporter: zhijiang > Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)