Alexander Trushev created HUDI-5151:
---------------------------------------

             Summary: Flink data skipping doesn't work with 
ClassNotFoundException of InLineFileSystem
                 Key: HUDI-5151
                 URL: https://issues.apache.org/jira/browse/HUDI-5151
             Project: Apache Hudi
          Issue Type: Bug
          Components: flink, metadata
            Reporter: Alexander Trushev
            Assignee: Alexander Trushev


h3. The problem

Flink data skipping doesn't work and warning occurs in log
{code:java}
7799 [main] WARN  org.apache.hudi.source.FileIndex [] - Read column stats for 
data skipping error
org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: Error occurs when executing map
        ...
Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hudi.common.fs.inline.InLineFileSystem not found
        at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329) 
~[hadoop-common-2.10.1.jar:?]
        ...
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
~[?:1.8.0_345]
{code}
h3. The solution

This problem has already been fixed by 
https://issues.apache.org/jira/browse/HUDI-3763
But the patch doesn't fix flink's job

We should use InLineFileSystem.class.getClassLoader() instead of 
Thread.currentThread().getContextClassLoader() because method 
lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread 
which may contain the wrong contextClassLoader
h3. When the problem occurs

It is an intermittent bug that is hard to reproduce. Below is the sequence of 
events leading to the error. I can attach flink job if necessary.
 # Flink runs job with miniCluster
 # TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to 
akka RpcSystem
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345
{code:java}
// akka internally caches the context class loader
// make sure it uses the plugin class loader
try (TemporaryClassLoaderContext ignored =
        TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
{code}

 # *All akka threads have SubmoduleClassLoader as a contextClassLoader* in 
thread local storage
 # *An akka thread creates completableFuture*
org.apache.flink.runtime.rpc.akka.SupervisorActor:206
{code:java}
return Patterns.ask(
                supervisor,
                createStartAkkaRpcActorMessage(propsFactory, endpointId),
                RpcUtils.INF_DURATION)
        .toCompletableFuture()
        .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
        .join();
{code}

 # Creation of worker thread in commonForkJoinPool is triggered
java.util.concurrent.ForkJoinPool:1485
{code:java}
try {
    if (fac != null && (wt = fac.newThread(this)) != null) {
        wt.start();
{code}

 # *commonForkJoinPool worker thread inherits SubmoduleClassLoader* as a 
contextClassLoader due to it is created from akka thread
 # The commonForkJoin worker creates another one. Now two workers contain the 
wrong classLoader
 # It is possible when commonForkJoinPool consists
2 workers with SubmoduleClassLoader
8 workers with AppClassLoader
 # Hudi uses java parallel stream which performs its tasks using 
commonForkJoinPool as well
org.apache.hudi.metadata.HoodieBackedTableMetadata:160
{code:java}
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : 
engineContext.parallelize(partitionFileSlices))
    .flatMap((SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
{code}

 # *lookupRecords is called from commonForkJoinPool worker*
20% chance that Thread.currentThread().getContextClassLoader() returns the 
wrong classLoader
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197
{code:java}
inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
{code}

 # Class.forName in hadoop Configuration throws ClassNotFoundException
org.apache.hadoop.conf.Configuration:2362
{code:java}
clazz = Class.forName(name, true, classLoader);
{code}

 # FileIndex prints warning. Data skipping does not work
org.apache.hudi.source.FileIndex:272
{code:java}
} catch (Throwable throwable) {
  LOG.warn("Read column stats for data skipping error", throwable);
  return null;
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to