zhaoshijie created FLINK-10721:
----------------------------------
Summary: kafkaFetcher runFetchLoop throw exception will cause
follow-up code not execute in FlinkKafkaConsumerBase run method
Key: FLINK-10721
URL: https://issues.apache.org/jira/browse/FLINK-10721
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.6.2
Reporter: zhaoshijie
In FlinkKafkaConsumerBase run method on line 721(master branch), if
kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw
exception then finally execute cancel method, cancel method will execute
kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute
handover.close, then result in handover.pollNext throw ClosedException),then
next code will not execute,especially discoveryLoopError not be throwed,so,
real culprit exception will be Swallowed.
failed log like this:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
at java.lang.Thread.run(Thread.java:745)
{code}
Shoud we modify it as follows?
{code:java}
{code}
try {
kafkaFetcher.runFetchLoop();
} catch (Exception e) {
// if discoveryLoopErrorRef not null ,we should
throw real culprit exception
if (discoveryLoopErrorRef.get() != null){
throw new
RuntimeException(discoveryLoopErrorRef.get());
} else {
throw e;
}
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)