[
https://issues.apache.org/jira/browse/FLINK-35542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jan Gurda updated FLINK-35542:
------------------------------
Affects Version/s: jdbc-3.2.0
> ClassNotFoundException when deserializing CheckpointedOffset
> ------------------------------------------------------------
>
> Key: FLINK-35542
> URL: https://issues.apache.org/jira/browse/FLINK-35542
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: jdbc-3.2.0, jdbc-3.1.2
> Environment: Flink 1.19.0
> Flink JDBC Connector 3.2-SNAPSHOT (commit
> 2defbbcf4fc550a76dd9c664e1eed7d261e028ca)
> JDK 11 (Temurin)
> Reporter: Jan Gurda
> Priority: Major
> Labels: pull-request-available
> Fix For: jdbc-3.3.0
>
>
> I use the latest flink-connector-jdbc code from the main branch, it's
> actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).
>
> When jobs get interrupted while reading data from the JDBC source (for
> example, by the TaskManager outage), they cannot recover due to the following
> exception:
> {code:java}
> java.lang.RuntimeException: java.lang.ClassNotFoundException:
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
> at
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
> at
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
> at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
> Source)
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
> Source)
> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
> at java.base/java.lang.Class.forName0(Native Method)
> at java.base/java.lang.Class.forName(Unknown Source)
> at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
> at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
> at
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109)
> at
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69)
> ... 22 more {code}
>
> In our deployment, we embed the JDBC connector classes into the job JAR file.
> It means that the class
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset is visible
> only for the _FlinkUserCodeClassLoader_ and not for the _AppClassLoader._ I
> believe the problem is in the following code snippet, where we use the class
> loader of the JDK's
> _DataInputStream_ class:
> {code:java}
> public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
> throws IOException, ClassNotFoundException {
> // ....
> // Some lines skipped
> CheckpointedOffset chkOffset =
> InstantiationUtil.deserializeObject(chkOffsetBytes,
> in.getClass().getClassLoader());
> return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
> } {code}
> If I change it to the following:
> {code:java}
> public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
> throws IOException, ClassNotFoundException {
> // ....
> // Some lines skipped
> CheckpointedOffset chkOffset =
> InstantiationUtil.deserializeObject(chkOffsetBytes,
> CheckpointedOffset.class.getClassLoader());
> return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
> } {code}
> Everything works as expected.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)