[
https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874714#comment-15874714
]
Tzu-Li (Gordon) Tai edited comment on FLINK-5849 at 2/20/17 3:50 PM:
---------------------------------------------------------------------
Some background context on the problem:
In FLINK-4280, the logic that fetcher's run at the start of {{runFetchLoop()}}
was changed to:
{code}
if (isRestored) {
// just use the offsets in the restored state as starting position
} else {
// find out the starting offsets based on StartupMode (either EARLIEST,
LATEST, or GROUP_OFFSETS),
// and set the partition states with the discovered start offsets so that
the state has defined offsets
}
{code}
So, the change also assumed that on restore, the state should not have
undefined offsets. As pointed out in the description of this JIRA, this is not
true if a checkpoint was taken before {{runFetchLoop()}} was reached.
The approaches I see in fixing this:
1. The faster fix - also let the {{if (isRestored)}} branch handle states that
don't have defined offsets.
2. Rework the life cycle of {{AbstractFetcher}}. We should instantiate
{{AbstractFetcher}} in the {{open()}} method of the UDF, and let the startup
offset determining process happen in the constructor of {{AbstractFetcher}}.
This assures that there will always be defined offsets when checkpointing
happens.
Option (2) will be more work, but I prefer that over (1) because it seems to be
a more proper fix.
Option (1) will lead to more complicated start position determining logic and
also make it less self-contained, since for partition states with undefined
offsets, we need to "fallback" to the {{StartupMode}} for that partition. This
will be a problem for the {{LATEST}} startup mode - we would be using the
latest record *at the time the job was restored with that undefined offset*,
and not correctly at the time of the actual first execution of the job.
was (Author: tzulitai):
Some background context on the problem:
In FLINK-4280, the logic that fetcher's run at the start of {{runFetchLoop()}}
was changed to:
{code}
if (isRestored) {
// just use the offsets in the restored state as starting position
} else {
// find out the starting offsets based on StartupMode (either EARLIEST,
LATEST, or GROUP_OFFSETS),
// and set the partition states with the discovered start offsets so that
the state has defined offsets
}
{code}
So, the change also assumed that on restore, the state should not have
undefined offsets. As pointed out in the description of this JIRA, this is not
true if a checkpoint was taken before {{runFetchLoop()}} was reached.
The approaches I see in fixing this:
1. The faster fix - also let the {{if (isRestored)}} branch handle states that
don't have defined offsets.
2. Rework the life cycle of {{AbstractFetcher}}. We should instantiate
{{AbstractFetcher}} in the {{open()}} method of the UDF, and let the startup
offset determining process happen in the constructor of {{AbstractFetcher}}.
This assures that there will always be defined offsets when checkpointing
happens.
Option (2) will be more work, but I prefer that over (1) because it seems to be
a more proper fix.
Option (1) will lead to more complicated start position determining logic,
since for partition states with undefined offsets, we need to "fallback" to the
{{StartupMode}} for that partition. This will be a problem for the {{LATEST}}
startup mode - we would be using the latest record *at the time the job was
restored with that undefined offset*, and not correctly at the time of the
actual first execution of the job.
> Kafka Consumer checkpointed state may contain undefined offsets
> ---------------------------------------------------------------
>
> Key: FLINK-5849
> URL: https://issues.apache.org/jira/browse/FLINK-5849
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Critical
>
> This is a regression due to FLINK-4280.
> In FLINK-4280, all initial offset determination was refactored to be
> consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this
> caused checkpoints that were triggered before the method was ever reached to
> contain undefined partition offsets.
> Ref:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
> at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
> at
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
> at
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint /
> savepoint, but found a partition state Partition:
> KafkaTopicPartition{topic='manyToOneTopic', partition=2},
> KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a
> defined offset.
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(KafkaConsumerThread.java:133)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:182)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:275)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)