[ https://issues.apache.org/jira/browse/FLINK-29417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser closed FLINK-29417. ---------------------------------- Resolution: Duplicate > One or more fetchers have encountered exception > ----------------------------------------------- > > Key: FLINK-29417 > URL: https://issues.apache.org/jira/browse/FLINK-29417 > Project: Flink > Issue Type: Bug > Reporter: liuchenhong > Priority: Major > > One machine in the cluster goes offline, most tasks failover, and then it > automatically restarts successfully. But the task of writing data to kudu > failed to restart. The error is as follows: > > {code:java} > //代码占位符 > 2022-09-26 16:06:30,634 WARN org.apache.flink.runtime.taskmanager.Task > [] - KunlunRecLogOutputWindow (11/90)#6 > (9e23189bdc1ba38b19f922342819dab2) switched from RUNNING to FAILED with > failure cause: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Error at remote task manager '****/***:**'. > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:351) > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:240) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.flink.runtime.io.network.partition.ProducerFailedException: > java.lang.RuntimeException: One or more fetchers have encountered exception > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:285) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:123) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:234) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:91) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) > ... 3 more > Caused by: java.lang.RuntimeException: One or more fetchers have encountered > exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:342) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ... 1 more > Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct > out-of-memory error has occurred. This can mean two things: either job(s) > require(s) a larger size of JVM direct memory or there is a direct memory > leak. The direct memory can be allocated by user code or some of its > dependencies. In this case 'taskmanager.memory.task.off-heap.size' > configuration option should be increased. Flink framework and its > dependencies also consume the direct memory, mostly for network > communication. The most of network memory is managed by Flink and should not > result in out-of-memory error. In certain special cases, in particular for > jobs with high parallelism, the framework may require more direct memory > which is not managed by Flink. In this case > 'taskmanager.memory.framework.off-heap.size' configuration option should be > increased. If the error persists then there is probably a direct memory leak > in user code or some of its dependencies which has to be investigated and > fixed. The task executor has to be shutdown... > at java.nio.Bits.reserveMemory(Bits.java:693) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) > at org.apache.kafka.common.network.Selector.poll(Selector.java:483) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > at > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:113) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more {code} > > > I don't know why and how to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)