ashokkumar-allu commented on code in PR #18585:
URL: https://github.com/apache/hudi/pull/18585#discussion_r3383878925
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -63,6 +69,70 @@
*/
public final class HoodieLocalEngineContext extends HoodieEngineContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
+ // When running in Java 11+, the common ForkJoinPool's workers don't inherit
the application
+ // classloader, causing ClassNotFoundExceptions. We use a custom pool whose
thread factory
+ // explicitly sets the correct classloader.
+ // See:
https://stackoverflow.com/questions/66240365/java-11-upgrade-from-8-parallel-streams-throws-classnotfoundexception
+ // Lazy holder — initialized on first use, not at class-load time (JLS
12.4.2 guarantees thread safety).
+ private static class PoolHolder {
+ static final ForkJoinPool INSTANCE = initForkJoinPool();
+ }
+
+ private static ForkJoinPool initForkJoinPool() {
+ int javaVersion = 0;
+ try {
+ String specVersion = System.getProperty("java.specification.version");
+ // "1.X" means Java 8 (Hudi's minimum, always < 11); Java 9+ uses plain
major version.
+ javaVersion = specVersion.startsWith("1.") ? 8 :
Integer.parseInt(specVersion.split("\\.")[0]);
+ } catch (NumberFormatException e) {
+ // Ignore, treat as pre-11
+ }
+ ForkJoinPool commonPool = ForkJoinPool.commonPool();
+ if (javaVersion >= 11) {
+ ForkJoinPool pool = new ForkJoinPool(commonPool.getParallelism(),
makeWorkerThreadFactory(), null, commonPool.getAsyncMode());
+ LOG.info("Using custom fork-join pool: java version={}, #threads={},
asyncMode={}",
+ javaVersion, pool.getParallelism(), pool.getAsyncMode());
+ return pool;
+ }
+ LOG.info("Using common fork-join pool: java version={}, #threads={},
asyncMode={}",
+ javaVersion, commonPool.getParallelism(), commonPool.getAsyncMode());
+ return commonPool;
+ }
+
+ private static ForkJoinPool.ForkJoinWorkerThreadFactory
makeWorkerThreadFactory() {
+ final String prefix = "hoodie-local-engine-context-pool-worker-";
+ return pool -> {
+ final ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ worker.setName(prefix + worker.getPoolIndex());
+
worker.setContextClassLoader(HoodieLocalEngineContext.class.getClassLoader());
+ LOG.debug("Creating worker thread {} with class loader {}",
worker.getName(), worker.getContextClassLoader());
+ return worker;
+ };
+ }
+
+ /**
+ * Runs {@code func} over {@code data} in parallel using a classloader-aware
ForkJoinPool.
+ * Unchecked exceptions thrown by {@code func} propagate as-is; checked
exceptions are wrapped
+ * in {@link RuntimeException}.
+ */
+ public static <I, O> List<O> mapParallel(List<I> data,
SerializableFunction<I, O> func) {
Review Comment:
Done — updated the Javadoc to call out that `mapParallel` is a `static`
utility method specific to `HoodieLocalEngineContext` and is NOT present in
other `HoodieEngineContext` implementations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]