[
https://issues.apache.org/jira/browse/FLINK-6515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021275#comment-16021275
]
Robert Metzger commented on FLINK-6515:
---------------------------------------
Are you sure that you've re-build your user code using he correct Flink version?
The Kafka connector code is usually located in the user jar, so you need to
update both Flink and your user code.
> KafkaConsumer checkpointing fails because of ClassLoader issues
> ---------------------------------------------------------------
>
> Key: FLINK-6515
> URL: https://issues.apache.org/jira/browse/FLINK-6515
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.3.0
> Reporter: Aljoscha Krettek
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.3.0, 1.4.0
>
>
> A job with Kafka and checkpointing enabled fails with:
> {code}
> java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom
> Source -> Map -> Sink: Unnamed (1/1)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1195)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator
> Source: Custom Source -> Map -> Sink: Unnamed (1/1).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1184)
> ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator
> Source: Custom Source -> Map -> Sink: Unnamed (1/1).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:520)
> ... 7 more
> Caused by: java.lang.RuntimeException: Could not copy instance of
> (KafkaTopicPartition{topic='test-input', partition=0},-1).
> at
> org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:54)
> at
> org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:32)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:71)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:368)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:380)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:191)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
> ... 12 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:371)
> at
> org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:349)
> at
> org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:52)
> ... 18 more
> {code}
> The problem seems to be {{TypeSerializer.copy()}}, which uses the wrong
> ClassLoader. Until recently this was not used but recent changes around
> asynchronous checkpointing of operator state require deep copies of the
> operator {{ListState}} and thus call this method.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)