[ 
https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885719#comment-15885719
 ] 

ASF GitHub Bot commented on FLINK-5849:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3378#discussion_r103199655
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
    @@ -42,13 +43,7 @@
     
     import java.io.Serializable;
     import java.lang.reflect.Field;
    -import java.util.ArrayList;
    -import java.util.Arrays;
    -import java.util.Collections;
    -import java.util.HashMap;
    -import java.util.HashSet;
    -import java.util.List;
    -import java.util.Set;
    +import java.util.*;
    --- End diff --
    
    Yikes, second time :/
    I think there's a settings to disable star imports in Intellij, will try to 
use it!


> Kafka Consumer checkpointed state may contain undefined offsets
> ---------------------------------------------------------------
>
>                 Key: FLINK-5849
>                 URL: https://issues.apache.org/jira/browse/FLINK-5849
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> This is a regression due to FLINK-4280.
> In FLINK-4280, all initial offset determination was refactored to be 
> consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this 
> caused checkpoints that were triggered before the method was ever reached to 
> contain undefined partition offsets.
> Ref:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>     at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
>     at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
>     at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
>     at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
>     at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
>     at 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>     at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / 
> savepoint, but found a partition state Partition: 
> KafkaTopicPartition{topic='manyToOneTopic', partition=2}, 
> KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a 
> defined offset.
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(KafkaConsumerThread.java:133)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:182)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:275)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
>     at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to