[
https://issues.apache.org/jira/browse/FLINK-25441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466662#comment-17466662
]
Lijie Wang edited comment on FLINK-25441 at 12/30/21, 2:42 AM:
---------------------------------------------------------------
Hi [~pnowojski], thanks for your detailed explanation.
> do you mean that [exactly this
> change|https://github.com/apache/flink/pull/17253/commits/43f4db7ade41eb0d5e052dcc81748d71740d540f#diff-b84174e55cb1999d99ad60cdeded7be20ff4978472bfc785c5a77b6270f47b56R799-R801]
> is causing the problems?
Yes, I think that change exposes this problem, although it may not the root
cause.
> `ProducerFailedException` should be handled as `CancelTaskException`
If so, I think the "java.util.concurrent.TimeoutException: Buffer request
timeout, this means there is a fierce contention of the batch shuffle read
memory, please increase
'taskmanager.memory.framework.off-heap.batch-shuffle.size'" should probably be
wrapped as {{PartitionException}} instead of {{{}ProducerFailedException{}}},
because the producer (upstream task) was FINISHED. (This problem appeared in
batch jobs using sort shuffle). cc [~kevin.cyj]
was (Author: wanglijie95):
Hi [~pnowojski], thanks for your detailed explanation.
> do you mean that [exactly this
> change|https://github.com/apache/flink/pull/17253/commits/43f4db7ade41eb0d5e052dcc81748d71740d540f#diff-b84174e55cb1999d99ad60cdeded7be20ff4978472bfc785c5a77b6270f47b56R799-R801]
> is causing the problems?
Yes, I think that change exposes this problem, although it may not the root
cause.
> `ProducerFailedException` should be handled as `CancelTaskException`
If so, I think the "java.util.concurrent.TimeoutException: Buffer request
timeout, this means there is a fierce contention of the batch shuffle read
memory, please increase
'taskmanager.memory.framework.off-heap.batch-shuffle.size'" should be wrapped
as {{PartitionException}} instead of {{{}ProducerFailedException{}}}, because
the producer (upstream task) was FINISHED. (This problem appeared in batch jobs
using sort shuffle). cc [~kevin.cyj]
> ProducerFailedException will cause task status switch from RUNNING to
> CANCELED, which will cause the job to hang.
> -----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-25441
> URL: https://issues.apache.org/jira/browse/FLINK-25441
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.15.0
> Reporter: Lijie Wang
> Priority: Major
>
> The {{ProducerFailedException}} extends {{{}CancelTaskException{}}}, which
> will cause the task status switched from RUNNING to CANCELED. As described in
> FLINK-17726, if a task is directly CANCELED by TaskManager due to its own
> runtime issue, the task will not be recovered by JM and thus the job would
> hang.
> Note that it will not cause problems before FLINK-24182 (it unifies the
> failureCause handling, changes the check of CancelTaskException from
> "{{instanceof CancelTaskException}}" to "{{ExceptionUtils.findThrowable}}"),
> because the {{ProducerFailedException}} is always wrapped by
> {{{}RemoteTransportException{}}}.
> The example log is as follows:
> {code:java}
> 2021-12-23 21:20:14,965 DEBUG org.apache.flink.runtime.taskmanager.Task
> [] - MultipleInput[945] [Source:
> HiveSource-tpcds_bin_orc_10000.catalog_sales, Source:
> HiveSource-tpcds_bin_orc_10000.store_sales, Source:
> HiveSource-tpcds_bin_orc_10000.catalog_sales, Source:
> HiveSource-tpcds_bin_orc_10000.store_sales, Source:
> HiveSource-tpcds_bin_orc_10000.store_sales, Source:
> HiveSource-tpcds_bin_orc_10000.item, Source:
> HiveSource-tpcds_bin_orc_10000.web_sales, Source:
> HiveSource-tpcds_bin_orc_10000.web_sales] -> Calc[885] (143/1024)#0
> (8a883116ab601dd5b9ad5d2717d18918) switched from RUNNING to CANCELED due to
> CancelTaskException:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Error at remote task manager
> 'k28b09250.eu95sqa.tbsite.net/100.69.96.154:47459'.
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:301)
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:190)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:834)
> Caused by:
> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
> java.util.concurrent.TimeoutException: Buffer request timeout, this means
> there is a fierce contention of the batch shuffle read memory, please
> increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:285)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:123)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:234)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:91)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:756)
> Caused by: java.util.concurrent.TimeoutException: Buffer request timeout,
> this means there is a fierce contention of the batch shuffle read memory,
> please increase 'taskmanager.memory.framework.off-heap.batch-shuffle.size'.
> at
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler.allocateBuffers(SortMergeResultPartitionReadScheduler.java:168)
> at
> org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler.run(SortMergeResultPartitionReadScheduler.java:139)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)