[ 
https://issues.apache.org/jira/browse/FLINK-40040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-40040:
-----------------------------------
    Labels: pull-request-available  (was: )

> SchemaRegistry loses coordination responses when a failure's cause is a 
> user-classloader-only class (e.g. a JDBC driver exception), causing the job 
> to hang until rpcTimeout and enter a restart loop
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40040
>                 URL: https://issues.apache.org/jira/browse/FLINK-40040
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.6.0
>            Reporter: Jiyong Wang
>            Priority: Major
>              Labels: pull-request-available
>
> When the schema-evolution coordinator ({{SchemaRegistry}}) completes a 
> coordination response exceptionally, the exception crosses the 
> operator-coordinator RPC boundary and is deserialized on the 
> {{SchemaOperator}} side by {{flink-rpc-akka}}, which uses an *isolated 
> classloader*. If the exception's cause chain contains a class that only 
> exists in the user classloader, deserialization fails with 
> {{ClassNotFoundException}} and the response is silently dropped.
> Observed in production with Flink CDC 3.6.0 on Flink 1.20.3:
> {noformat}
> ERROR org.apache.pekko.remote.Remoting - 
> com.mysql.cj.exceptions.ConnectionIsClosedException
> java.lang.ClassNotFoundException: 
> com.mysql.cj.exceptions.ConnectionIsClosedException
>     at org.apache.pekko.util.ClassLoaderObjectInputStream.resolveClass(...)
>     at ...MiscMessageSerializer.deserializeStatusFailure(...)
> {noformat}
> Because the coordination response never arrives, 
> {{SchemaOperator.sendRequestToCoordinator}} blocks on 
> {{responseFuture.get(rpcTimeout)}} and fails ~3 minutes later with a 
> *misleading* {{TimeoutException}}:
> {noformat}
> IllegalStateException: Failed to send request to coordinator: 
> SchemaChangeRequest{...AddColumnEvent...}
> Caused by: java.util.concurrent.TimeoutException
>     at ...SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:243)
> {noformat}
> The job then restarts, hits the same condition again, and only a *full job 
> restart* recovers -- a transient error is amplified into a permanent restart 
> loop, and the real cause is hidden behind a {{TimeoutException}}.
> h3. Root cause (confirmed)
> {{SchemaRegistry.failJob}} passes the original {{Throwable}} to 
> {{completeExceptionally(...)}} and {{context.failJob(...)}}. When that 
> throwable (or any element of its cause chain) is a user-classloader-only 
> type, it cannot be deserialized by the isolated RPC classloader on the 
> receiving side.
> h3. Trigger in this incident (inferred -- JM logs no longer available)
> The {{ConnectionIsClosedException}} most likely originated from the 
> JobManager-side {{MySqlSourceEnumerator}} during table discovery / chunk 
> splitting (the only JM component that holds a MySQL connection), then 
> propagated across the coordinator RPC boundary. The exact producing site 
> could not be pinned down without JM logs, but the deserialization failure 
> itself is fully evidenced by the TaskManager stack above.
> h3. Fix
> Wrap the failure into {{org.apache.flink.util.SerializedThrowable}} in 
> {{SchemaRegistry.failJob}} before it crosses the RPC boundary. 
> {{SerializedThrowable}} carries the original exception as bytes plus a 
> stringified stack trace, so the receiving side can deserialize it without the 
> original class and still see the real cause. {{runInEventLoop}} is routed 
> through {{failJob}} so all exit paths share the wrapping; this covers both 
> the regular and distributed topologies.
> h3. Reproduction
> A minimal reproduction (also added as a unit test) serializes 
> {{RuntimeException(cause = <class visible only to a child classloader>)}} and 
> deserializes it with a classloader that cannot see that class:
> * Without the fix: {{ClassNotFoundException}} -> response lost.
> * With the fix ({{SerializedThrowable}}): deserializes successfully and 
> preserves the real cause as text.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to