noorall opened a new pull request, #27739:
URL: https://github.com/apache/flink/pull/27739

   ## What is the purpose of the change
   
   This bug is caused by two factors that jointly lead to deserialization 
failures:
   
   **Cause 1: The whitelist-based routing strategy of 
`PlannerComponentClassLoader` prevents fallback to the parent for 
non-whitelisted classes.**  
   `PlannerComponentClassLoader` uses a strict **whitelist-based** routing 
strategy: only whitelisted packages are loaded according to the configured 
lookup order (e.g., parent-first/component-first) and are allowed to fall back 
to the parent `ClassLoader` when not found.  
   For **non-whitelisted** packages (e.g., user-defined POJOs), it defaults to 
**component-only** lookup and does not fall back to the parent. Since these 
user classes exist only in the user JAR (typically visible to the parent/user 
class loader) and not in the isolated `table-planner-*.jar` (the component), 
deserialization fails with a `ClassNotFoundException`.
   
   **Cause 2: Deserialization incorrectly uses the current thread’s class 
loader instead of the user class loader.**  
   During deserialization of join-type related objects (and similar operator 
metadata), the code uses `Thread.currentThread().getContextClassLoader()` 
rather than the user class loader. In E2E scenarios, classes from the user JAR 
are only visible to the user class loader and are not necessarily present in 
the current thread’s class loader. As a result, operators that use 
user-jar-defined classes as join types cannot be deserialized correctly.
   
   **Required fix:**  
   1. Change the default loading behavior for non-whitelisted packages so that 
for non-whitelisted packages, it will automatically fall back to the 
parent/component `ClassLoader`. This ensures custom user types can be resolved 
during the AdaptiveJoin deserialization.
   2. Use `UserClassLoader` instead of the current thread context `ClassLoader`.
   
   ## Brief change log
   
     - *Fix join operator crashes jobs when using custom types or custom type 
serializers*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to