Jiyong Wang created FLINK-40040:
-----------------------------------
Summary: SchemaRegistry loses coordination responses when a
failure's cause is a user-classloader-only class (e.g. a JDBC driver
exception), causing the job to hang until rpcTimeout and enter a restart loop
Key: FLINK-40040
URL: https://issues.apache.org/jira/browse/FLINK-40040
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.6.0
Reporter: Jiyong Wang
When the schema-evolution coordinator ({{SchemaRegistry}}) completes a
coordination response exceptionally, the exception crosses the
operator-coordinator RPC boundary and is deserialized on the {{SchemaOperator}}
side by {{flink-rpc-akka}}, which uses an *isolated classloader*. If the
exception's cause chain contains a class that only exists in the user
classloader, deserialization fails with {{ClassNotFoundException}} and the
response is silently dropped.
Observed in production with Flink CDC 3.6.0 on Flink 1.20.3:
{noformat}
ERROR org.apache.pekko.remote.Remoting -
com.mysql.cj.exceptions.ConnectionIsClosedException
java.lang.ClassNotFoundException:
com.mysql.cj.exceptions.ConnectionIsClosedException
at org.apache.pekko.util.ClassLoaderObjectInputStream.resolveClass(...)
at ...MiscMessageSerializer.deserializeStatusFailure(...)
{noformat}
Because the coordination response never arrives,
{{SchemaOperator.sendRequestToCoordinator}} blocks on
{{responseFuture.get(rpcTimeout)}} and fails ~3 minutes later with a
*misleading* {{TimeoutException}}:
{noformat}
IllegalStateException: Failed to send request to coordinator:
SchemaChangeRequest{...AddColumnEvent...}
Caused by: java.util.concurrent.TimeoutException
at ...SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:243)
{noformat}
The job then restarts, hits the same condition again, and only a *full job
restart* recovers -- a transient error is amplified into a permanent restart
loop, and the real cause is hidden behind a {{TimeoutException}}.
h3. Root cause (confirmed)
{{SchemaRegistry.failJob}} passes the original {{Throwable}} to
{{completeExceptionally(...)}} and {{context.failJob(...)}}. When that
throwable (or any element of its cause chain) is a user-classloader-only type,
it cannot be deserialized by the isolated RPC classloader on the receiving side.
h3. Trigger in this incident (inferred -- JM logs no longer available)
The {{ConnectionIsClosedException}} most likely originated from the
JobManager-side {{MySqlSourceEnumerator}} during table discovery / chunk
splitting (the only JM component that holds a MySQL connection), then
propagated across the coordinator RPC boundary. The exact producing site could
not be pinned down without JM logs, but the deserialization failure itself is
fully evidenced by the TaskManager stack above.
h3. Fix
Wrap the failure into {{org.apache.flink.util.SerializedThrowable}} in
{{SchemaRegistry.failJob}} before it crosses the RPC boundary.
{{SerializedThrowable}} carries the original exception as bytes plus a
stringified stack trace, so the receiving side can deserialize it without the
original class and still see the real cause. {{runInEventLoop}} is routed
through {{failJob}} so all exit paths share the wrapping; this covers both the
regular and distributed topologies.
h3. Reproduction
A minimal reproduction (also added as a unit test) serializes
{{RuntimeException(cause = <class visible only to a child classloader>)}} and
deserializes it with a classloader that cannot see that class:
* Without the fix: {{ClassNotFoundException}} -> response lost.
* With the fix ({{SerializedThrowable}}): deserializes successfully and
preserves the real cause as text.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)