[ 
https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876139#comment-15876139
 ] 

ASF GitHub Bot commented on FLINK-5849:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3378

    [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to 
open()

    This PR fixes a regression due to the recently merged #2509 (FLINK-4280).
    The new start position feature added in #2509 needed to assume that on 
restore, all offsets are defined. This was not true, if a restored checkpoint 
was taken before the fetcher was ever initialized or run.
    
    This PR fixes this by changing the following:
    1. Move the start position determination logic to `open()`. This assures 
that when `snapshotState()` is called, we will always have defined offsets.
    2. Introduce special "magic offset values" to represent that a partition is 
to be started from either `EARLIEST`, `LATEST`, or `GROUP_OFFSETS`. These 
values are set as placeholders in `open()`. The consumer follows a lazy 
evaluation approach to only replace these magic values with actual offsets when 
the fetcher actually starts running.
    
    Therefore, with this PR, if a checkpoint happens before a fetcher fully 
starts consuming all of its subscribed partitions, it will at least contain the 
"magic offset value" in the state, instead of an undefined offset like before.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5849

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3378.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3378
    
----
commit 7e7bf1d106d4dc0d24fa6746e94ccdadbc06088e
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-02-21T15:05:32Z

    [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to 
open()

----


> Kafka Consumer checkpointed state may contain undefined offsets
> ---------------------------------------------------------------
>
>                 Key: FLINK-5849
>                 URL: https://issues.apache.org/jira/browse/FLINK-5849
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> This is a regression due to FLINK-4280.
> In FLINK-4280, all initial offset determination was refactored to be 
> consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this 
> caused checkpoints that were triggered before the method was ever reached to 
> contain undefined partition offsets.
> Ref:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>     at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>     at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
>     at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
>     at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
>     at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
>     at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
>     at 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>     at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / 
> savepoint, but found a partition state Partition: 
> KafkaTopicPartition{topic='manyToOneTopic', partition=2}, 
> KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a 
> defined offset.
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(KafkaConsumerThread.java:133)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:182)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:275)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
>     at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to