[ https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885704#comment-15885704 ]
ASF GitHub Bot commented on FLINK-5849: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3378#discussion_r103196367 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -330,8 +315,49 @@ public void cancel() { public void open(Configuration configuration) { --- End diff -- I wonder if it makes sense to move the method above the run() method. Then its more logical going through the source code. > Kafka Consumer checkpointed state may contain undefined offsets > --------------------------------------------------------------- > > Key: FLINK-5849 > URL: https://issues.apache.org/jira/browse/FLINK-5849 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Critical > > This is a regression due to FLINK-4280. > In FLINK-4280, all initial offset determination was refactored to be > consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this > caused checkpoints that were triggered before the method was ever reached to > contain undefined partition offsets. > Ref: > {code} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173) > at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32) > at > org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942) > at > org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / > savepoint, but found a partition state Partition: > KafkaTopicPartition{topic='manyToOneTopic', partition=2}, > KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a > defined offset. > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(KafkaConsumerThread.java:133) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:182) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:275) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)