Sanjar Akhmedov created FLINK-3101:
--------------------------------------
Summary: Flink Kafka consumer crashes with NPE when it sees
deleted record
Key: FLINK-3101
URL: https://issues.apache.org/jira/browse/FLINK-3101
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 0.10.1, 1.0.0
Environment: Apache Flink 0.10.1 binary for Hadoop 2.6.0 with
Scala 2.10.
Reporter: Sanjar Akhmedov
Kafka allows a records to be deleted from the log by sending a record with key
and null payload. Consumers still can see those null values (deletes) before
they are compacted (delete retention point).
Flink Kafka consumer crashes with NPE when it sees such record:
{noformat}
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:443)
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)