[ 
https://issues.apache.org/jira/browse/HUDI-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-5151:
---------------------------------
    Labels: pull-request-available  (was: )

> Flink data skipping doesn't work with ClassNotFoundException of 
> InLineFileSystem
> --------------------------------------------------------------------------------
>
>                 Key: HUDI-5151
>                 URL: https://issues.apache.org/jira/browse/HUDI-5151
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink, metadata
>            Reporter: Alexander Trushev
>            Assignee: Alexander Trushev
>            Priority: Major
>              Labels: pull-request-available
>
> h3. The problem
> Flink data skipping doesn't work and warning occurs in log
> {code:java}
> 7799 [main] WARN  org.apache.hudi.source.FileIndex [] - Read column stats for 
> data skipping error
> org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieException: Error occurs when executing map
>       ...
> Caused by: java.lang.ClassNotFoundException: Class 
> org.apache.hudi.common.fs.inline.InLineFileSystem not found
>       at 
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329) 
> ~[hadoop-common-2.10.1.jar:?]
>       ...
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
> ~[?:1.8.0_345]
> {code}
> h3. The solution
> This problem has already been fixed by 
> https://issues.apache.org/jira/browse/HUDI-3763
> But the patch doesn't fix flink's job
> We should use InLineFileSystem.class.getClassLoader() instead of 
> Thread.currentThread().getContextClassLoader() because method 
> lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread 
> which may contain the wrong contextClassLoader
> h3. When the problem occurs
> It is an intermittent bug that is hard to reproduce. Below is the sequence of 
> events leading to the error. I can attach flink job if necessary.
>  # Flink runs job with miniCluster
>  # TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to 
> akka RpcSystem
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345
> {code:java}
> // akka internally caches the context class loader
> // make sure it uses the plugin class loader
> try (TemporaryClassLoaderContext ignored =
>         TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
> {code}
>  # *All akka threads have SubmoduleClassLoader as a contextClassLoader* in 
> thread local storage
>  # *An akka thread creates completableFuture*
> org.apache.flink.runtime.rpc.akka.SupervisorActor:206
> {code:java}
> return Patterns.ask(
>                 supervisor,
>                 createStartAkkaRpcActorMessage(propsFactory, endpointId),
>                 RpcUtils.INF_DURATION)
>         .toCompletableFuture()
>         .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
>         .join();
> {code}
>  # Creation of worker thread in commonForkJoinPool is triggered
> java.util.concurrent.ForkJoinPool:1485
> {code:java}
> try {
>     if (fac != null && (wt = fac.newThread(this)) != null) {
>         wt.start();
> {code}
>  # *commonForkJoinPool worker thread inherits SubmoduleClassLoader* as a 
> contextClassLoader due to it is created from akka thread
>  # The commonForkJoin worker creates another one. Now two workers contain the 
> wrong classLoader
>  # It is possible when commonForkJoinPool consists
> 2 workers with SubmoduleClassLoader
> 8 workers with AppClassLoader
>  # Hudi uses java parallel stream which performs its tasks using 
> commonForkJoinPool as well
> org.apache.hudi.metadata.HoodieBackedTableMetadata:160
> {code:java}
> return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : 
> engineContext.parallelize(partitionFileSlices))
>     .flatMap((SerializableFunction<FileSlice, 
> Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
> {code}
>  # *lookupRecords is called from commonForkJoinPool worker*
> 20% chance that Thread.currentThread().getContextClassLoader() returns the 
> wrong classLoader
> org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197
> {code:java}
> inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
> {code}
>  # Class.forName in hadoop Configuration throws ClassNotFoundException
> org.apache.hadoop.conf.Configuration:2362
> {code:java}
> clazz = Class.forName(name, true, classLoader);
> {code}
>  # FileIndex prints warning. Data skipping does not work
> org.apache.hudi.source.FileIndex:272
> {code:java}
> } catch (Throwable throwable) {
>   LOG.warn("Read column stats for data skipping error", throwable);
>   return null;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to