[
https://issues.apache.org/jira/browse/FLINK-29416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17609511#comment-17609511
]
Weijie Guo commented on FLINK-29416:
------------------------------------
It seems that direct memory is not enough, you can try to increase the size of
taskmanager.memory.task.off-heap.size
> One or more fetchers have encountered exception
> -----------------------------------------------
>
> Key: FLINK-29416
> URL: https://issues.apache.org/jira/browse/FLINK-29416
> Project: Flink
> Issue Type: Bug
> Reporter: liuchenhong
> Priority: Major
>
> One machine in the cluster goes offline, most tasks failover, and then it
> automatically restarts successfully. But the task of writing data to kudu
> failed to restart. The error is as follows:
>
> {code:java}
> //代码占位符
> 2022-09-26 16:06:30,634 WARN org.apache.flink.runtime.taskmanager.Task
> [] - KunlunRecLogOutputWindow (11/90)#6
> (9e23189bdc1ba38b19f922342819dab2) switched from RUNNING to FAILED with
> failure cause:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Error at remote task manager 'bjstream44.jd.163.org/10.196.24.76:39604'.
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:351)
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:240)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.flink.runtime.io.network.partition.ProducerFailedException:
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:285)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:123)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:234)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:91)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
> ... 3 more
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered
> exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:342)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ... 1 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should not
> result in out-of-memory error. In certain special cases, in particular for
> jobs with high parallelism, the framework may require more direct memory
> which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory leak
> in user code or some of its dependencies which has to be investigated and
> fixed. The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:693)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:113)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more {code}
>
>
> I don't know why and how to fix it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)