[
https://issues.apache.org/jira/browse/FLINK-40040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-40040:
-----------------------------------
Labels: pull-request-available (was: )
> SchemaRegistry loses coordination responses when a failure's cause is a
> user-classloader-only class (e.g. a JDBC driver exception), causing the job
> to hang until rpcTimeout and enter a restart loop
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-40040
> URL: https://issues.apache.org/jira/browse/FLINK-40040
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.6.0
> Reporter: Jiyong Wang
> Priority: Major
> Labels: pull-request-available
>
> When the schema-evolution coordinator ({{SchemaRegistry}}) completes a
> coordination response exceptionally, the exception crosses the
> operator-coordinator RPC boundary and is deserialized on the
> {{SchemaOperator}} side by {{flink-rpc-akka}}, which uses an *isolated
> classloader*. If the exception's cause chain contains a class that only
> exists in the user classloader, deserialization fails with
> {{ClassNotFoundException}} and the response is silently dropped.
> Observed in production with Flink CDC 3.6.0 on Flink 1.20.3:
> {noformat}
> ERROR org.apache.pekko.remote.Remoting -
> com.mysql.cj.exceptions.ConnectionIsClosedException
> java.lang.ClassNotFoundException:
> com.mysql.cj.exceptions.ConnectionIsClosedException
> at org.apache.pekko.util.ClassLoaderObjectInputStream.resolveClass(...)
> at ...MiscMessageSerializer.deserializeStatusFailure(...)
> {noformat}
> Because the coordination response never arrives,
> {{SchemaOperator.sendRequestToCoordinator}} blocks on
> {{responseFuture.get(rpcTimeout)}} and fails ~3 minutes later with a
> *misleading* {{TimeoutException}}:
> {noformat}
> IllegalStateException: Failed to send request to coordinator:
> SchemaChangeRequest{...AddColumnEvent...}
> Caused by: java.util.concurrent.TimeoutException
> at ...SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:243)
> {noformat}
> The job then restarts, hits the same condition again, and only a *full job
> restart* recovers -- a transient error is amplified into a permanent restart
> loop, and the real cause is hidden behind a {{TimeoutException}}.
> h3. Root cause (confirmed)
> {{SchemaRegistry.failJob}} passes the original {{Throwable}} to
> {{completeExceptionally(...)}} and {{context.failJob(...)}}. When that
> throwable (or any element of its cause chain) is a user-classloader-only
> type, it cannot be deserialized by the isolated RPC classloader on the
> receiving side.
> h3. Trigger in this incident (inferred -- JM logs no longer available)
> The {{ConnectionIsClosedException}} most likely originated from the
> JobManager-side {{MySqlSourceEnumerator}} during table discovery / chunk
> splitting (the only JM component that holds a MySQL connection), then
> propagated across the coordinator RPC boundary. The exact producing site
> could not be pinned down without JM logs, but the deserialization failure
> itself is fully evidenced by the TaskManager stack above.
> h3. Fix
> Wrap the failure into {{org.apache.flink.util.SerializedThrowable}} in
> {{SchemaRegistry.failJob}} before it crosses the RPC boundary.
> {{SerializedThrowable}} carries the original exception as bytes plus a
> stringified stack trace, so the receiving side can deserialize it without the
> original class and still see the real cause. {{runInEventLoop}} is routed
> through {{failJob}} so all exit paths share the wrapping; this covers both
> the regular and distributed topologies.
> h3. Reproduction
> A minimal reproduction (also added as a unit test) serializes
> {{RuntimeException(cause = <class visible only to a child classloader>)}} and
> deserializes it with a classloader that cannot see that class:
> * Without the fix: {{ClassNotFoundException}} -> response lost.
> * With the fix ({{SerializedThrowable}}): deserializes successfully and
> preserves the real cause as text.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)