ashokkumar-allu commented on code in PR #18585:
URL: https://github.com/apache/hudi/pull/18585#discussion_r3286171341


##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,69 @@
  */
 public final class HoodieLocalEngineContext extends HoodieEngineContext {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+  // When running in Java 11+, the common ForkJoinPool's workers don't inherit 
the application
+  // classloader, causing ClassNotFoundExceptions. We use a custom pool whose 
thread factory
+  // explicitly sets the correct classloader.
+  // See: 
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+  private static final ForkJoinPool FORK_JOIN_POOL = initForkJoinPool();
+
+  private static ForkJoinPool initForkJoinPool() {
+    int javaVersion = 0;
+    try {
+      String specVersion = System.getProperty("java.specification.version");
+      // Pre-Java 9: "1.X" format; Java 9+: "11", "17", "21", etc.
+      javaVersion = specVersion.startsWith("1.")
+          ? Integer.parseInt(specVersion.split("\\.")[1])
+          : Integer.parseInt(specVersion.split("\\.")[0]);
+    } catch (NumberFormatException e) {
+      // Ignore, treat as pre-11
+    }
+    ForkJoinPool commonPool = ForkJoinPool.commonPool();
+    if (javaVersion >= 11) {
+      ForkJoinPool pool = new ForkJoinPool(commonPool.getParallelism(), 
makeWorkerThreadFactory(), null, commonPool.getAsyncMode());
+      LOG.info("Using custom fork-join pool: java version={}, #threads={}, 
asyncMode={}",
+          javaVersion, pool.getParallelism(), pool.getAsyncMode());
+      return pool;
+    }
+    LOG.info("Using common fork-join pool: java version={}, #threads={}, 
asyncMode={}",
+        javaVersion, commonPool.getParallelism(), commonPool.getAsyncMode());
+    return commonPool;
+  }
+
+  private static ForkJoinPool.ForkJoinWorkerThreadFactory 
makeWorkerThreadFactory() {
+    final String prefix = "hoodie-local-engine-context-pool-worker-";
+    return pool -> {
+      final ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+      worker.setName(prefix + worker.getPoolIndex());
+      
worker.setContextClassLoader(HoodieLocalEngineContext.class.getClassLoader());
+      LOG.info("Creating worker thread {} with class loader {}", 
worker.getName(), worker.getContextClassLoader());

Review Comment:
   Updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to