[
https://issues.apache.org/jira/browse/HUDI-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated HUDI-5151:
---------------------------------
Labels: pull-request-available (was: )
> Flink data skipping doesn't work with ClassNotFoundException of
> InLineFileSystem
> --------------------------------------------------------------------------------
>
> Key: HUDI-5151
> URL: https://issues.apache.org/jira/browse/HUDI-5151
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink, metadata
> Reporter: Alexander Trushev
> Assignee: Alexander Trushev
> Priority: Major
> Labels: pull-request-available
>
> h3. The problem
> Flink data skipping doesn't work and warning occurs in log
> {code:java}
> 7799 [main] WARN org.apache.hudi.source.FileIndex [] - Read column stats for
> data skipping error
> org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieException: Error occurs when executing map
> ...
> Caused by: java.lang.ClassNotFoundException: Class
> org.apache.hudi.common.fs.inline.InLineFileSystem not found
> at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329)
> ~[hadoop-common-2.10.1.jar:?]
> ...
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> ~[?:1.8.0_345]
> {code}
> h3. The solution
> This problem has already been fixed by
> https://issues.apache.org/jira/browse/HUDI-3763
> But the patch doesn't fix flink's job
> We should use InLineFileSystem.class.getClassLoader() instead of
> Thread.currentThread().getContextClassLoader() because method
> lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread
> which may contain the wrong contextClassLoader
> h3. When the problem occurs
> It is an intermittent bug that is hard to reproduce. Below is the sequence of
> events leading to the error. I can attach flink job if necessary.
> # Flink runs job with miniCluster
> # TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to
> akka RpcSystem
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345
> {code:java}
> // akka internally caches the context class loader
> // make sure it uses the plugin class loader
> try (TemporaryClassLoaderContext ignored =
> TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
> {code}
> # *All akka threads have SubmoduleClassLoader as a contextClassLoader* in
> thread local storage
> # *An akka thread creates completableFuture*
> org.apache.flink.runtime.rpc.akka.SupervisorActor:206
> {code:java}
> return Patterns.ask(
> supervisor,
> createStartAkkaRpcActorMessage(propsFactory, endpointId),
> RpcUtils.INF_DURATION)
> .toCompletableFuture()
> .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
> .join();
> {code}
> # Creation of worker thread in commonForkJoinPool is triggered
> java.util.concurrent.ForkJoinPool:1485
> {code:java}
> try {
> if (fac != null && (wt = fac.newThread(this)) != null) {
> wt.start();
> {code}
> # *commonForkJoinPool worker thread inherits SubmoduleClassLoader* as a
> contextClassLoader due to it is created from akka thread
> # The commonForkJoin worker creates another one. Now two workers contain the
> wrong classLoader
> # It is possible when commonForkJoinPool consists
> 2 workers with SubmoduleClassLoader
> 8 workers with AppClassLoader
> # Hudi uses java parallel stream which performs its tasks using
> commonForkJoinPool as well
> org.apache.hudi.metadata.HoodieBackedTableMetadata:160
> {code:java}
> return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) :
> engineContext.parallelize(partitionFileSlices))
> .flatMap((SerializableFunction<FileSlice,
> Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
> {code}
> # *lookupRecords is called from commonForkJoinPool worker*
> 20% chance that Thread.currentThread().getContextClassLoader() returns the
> wrong classLoader
> org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197
> {code:java}
> inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
> {code}
> # Class.forName in hadoop Configuration throws ClassNotFoundException
> org.apache.hadoop.conf.Configuration:2362
> {code:java}
> clazz = Class.forName(name, true, classLoader);
> {code}
> # FileIndex prints warning. Data skipping does not work
> org.apache.hudi.source.FileIndex:272
> {code:java}
> } catch (Throwable throwable) {
> LOG.warn("Read column stats for data skipping error", throwable);
> return null;
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)