Jan Gurda created FLINK-35542:
---------------------------------
Summary: ClassNotFoundException when deserializing
CheckpointedOffset
Key: FLINK-35542
URL: https://issues.apache.org/jira/browse/FLINK-35542
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: jdbc-3.1.2
Environment: Flink 1.19.0
Flink JDBC Connector 3.2-SNAPSHOT (commit
2defbbcf4fc550a76dd9c664e1eed7d261e028ca)
JDK 11 (Temurin)
Reporter: Jan Gurda
Fix For: jdbc-3.2.0
I use the latest flink-connector-jdbc code from the main branch, it's actually
3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).
When jobs get interrupted while reading data from the JDBC source (for example,
by the TaskManager outage), they cannot recover due to the following exception:
{code:java}
java.lang.RuntimeException: java.lang.ClassNotFoundException:
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
at
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
at
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
at
org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
at
org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
Source)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
at
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109)
at
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69)
... 22 more {code}
In our deployment, we embed the JDBC connector classes into the job JAR file.
It means that the class
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset is visible only
for the _FlinkUserCodeClassLoader_ and not for the _AppClassLoader._ I believe
the problem is in the following code snippet, where we use the class loader of
the JDK's
_DataInputStream_ class:
{code:java}
public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
throws IOException, ClassNotFoundException {
// ....
// Some lines skipped
CheckpointedOffset chkOffset =
InstantiationUtil.deserializeObject(chkOffsetBytes,
in.getClass().getClassLoader());
return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
} {code}
If I change it to the following:
{code:java}
public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
throws IOException, ClassNotFoundException {
// ....
// Some lines skipped
CheckpointedOffset chkOffset =
InstantiationUtil.deserializeObject(chkOffsetBytes,
CheckpointedOffset.class.getClassLoader());
return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
} {code}
Everything works as expected.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)