[ 
https://issues.apache.org/jira/browse/HUDI-5151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Trushev updated HUDI-5151:
------------------------------------
    Description: 
h3. The problem

Flink data skipping doesn't work and warning occurs in log
{code:java}
7799 [main] WARN  org.apache.hudi.source.FileIndex [] - Read column stats for 
data skipping error
org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: Error occurs when executing map
        ...
Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hudi.common.fs.inline.InLineFileSystem not found
        at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329) 
~[hadoop-common-2.10.1.jar:?]
        ...
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
~[?:1.8.0_345]
{code}
h3. The solution

This problem has already been fixed by 
https://issues.apache.org/jira/browse/HUDI-3763
But the patch doesn't fix flink's job

We should use InLineFileSystem.class.getClassLoader() instead of 
Thread.currentThread().getContextClassLoader() because method 
lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread 
which may contain the wrong contextClassLoader
h3. When the problem occurs

It is an intermittent bug that is hard to reproduce. Below is the sequence of 
events leading to the error. I can attach flink job if necessary.
 # Flink runs job with miniCluster
 # TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to 
akka RpcSystem
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345
{code:java}
// akka internally caches the context class loader
// make sure it uses the plugin class loader
try (TemporaryClassLoaderContext ignored =
        TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
{code}
 # *All akka threads have SubmoduleClassLoader as a contextClassLoader* in 
thread local storage
 # *An akka thread creates completableFuture*
org.apache.flink.runtime.rpc.akka.SupervisorActor:206
{code:java}
return Patterns.ask(
                supervisor,
                createStartAkkaRpcActorMessage(propsFactory, endpointId),
                RpcUtils.INF_DURATION)
        .toCompletableFuture()
        .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
        .join();
{code}
 # Creation of worker thread in commonForkJoinPool is triggered
java.util.concurrent.ForkJoinPool:1485
{code:java}
try {
    if (fac != null && (wt = fac.newThread(this)) != null) {
        wt.start();
{code}
 # *commonForkJoinPool worker thread inherits SubmoduleClassLoader* as a 
contextClassLoader due to it is created from akka thread
 # The commonForkJoin worker creates another one. Now two workers contain the 
wrong classLoader
 # It is possible when commonForkJoinPool consists
2 workers with SubmoduleClassLoader
8 workers with AppClassLoader
 # Hudi uses java parallel stream which performs its tasks using 
commonForkJoinPool as well
org.apache.hudi.metadata.HoodieBackedTableMetadata:160
{code:java}
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : 
engineContext.parallelize(partitionFileSlices))
    .flatMap((SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
{code}
 # *lookupRecords is called from commonForkJoinPool worker*
20% chance that Thread.currentThread().getContextClassLoader() returns the 
wrong classLoader
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197
{code:java}
inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
{code}
 # Class.forName in hadoop Configuration throws ClassNotFoundException
org.apache.hadoop.conf.Configuration:2362
{code:java}
clazz = Class.forName(name, true, classLoader);
{code}
 # FileIndex prints warning. Data skipping does not work
org.apache.hudi.source.FileIndex:272
{code:java}
} catch (Throwable throwable) {
  LOG.warn("Read column stats for data skipping error", throwable);
  return null;
{code}

  was:
h3. The problem

Flink data skipping doesn't work and warning occurs in log
{code:java}
7799 [main] WARN  org.apache.hudi.source.FileIndex [] - Read column stats for 
data skipping error
org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: Error occurs when executing map
        ...
Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hudi.common.fs.inline.InLineFileSystem not found
        at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329) 
~[hadoop-common-2.10.1.jar:?]
        ...
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
~[?:1.8.0_345]
{code}
h3. The solution

This problem has already been fixed by 
https://issues.apache.org/jira/browse/HUDI-3763
But the patch doesn't fix flink's job

We should use InLineFileSystem.class.getClassLoader() instead of 
Thread.currentThread().getContextClassLoader() because method 
lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread 
which may contain the wrong contextClassLoader
h3. When the problem occurs

It is an intermittent bug that is hard to reproduce. Below is the sequence of 
events leading to the error. I can attach flink job if necessary.
 # Flink runs job with miniCluster
 # TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to 
akka RpcSystem
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345
{code:java}
// akka internally caches the context class loader
// make sure it uses the plugin class loader
try (TemporaryClassLoaderContext ignored =
        TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
{code}

 # *All akka threads have SubmoduleClassLoader as a contextClassLoader* in 
thread local storage
 # *An akka thread creates completableFuture*
org.apache.flink.runtime.rpc.akka.SupervisorActor:206
{code:java}
return Patterns.ask(
                supervisor,
                createStartAkkaRpcActorMessage(propsFactory, endpointId),
                RpcUtils.INF_DURATION)
        .toCompletableFuture()
        .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
        .join();
{code}

 # Creation of worker thread in commonForkJoinPool is triggered
java.util.concurrent.ForkJoinPool:1485
{code:java}
try {
    if (fac != null && (wt = fac.newThread(this)) != null) {
        wt.start();
{code}

 # *commonForkJoinPool worker thread inherits SubmoduleClassLoader* as a 
contextClassLoader due to it is created from akka thread
 # The commonForkJoin worker creates another one. Now two workers contain the 
wrong classLoader
 # It is possible when commonForkJoinPool consists
2 workers with SubmoduleClassLoader
8 workers with AppClassLoader
 # Hudi uses java parallel stream which performs its tasks using 
commonForkJoinPool as well
org.apache.hudi.metadata.HoodieBackedTableMetadata:160
{code:java}
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : 
engineContext.parallelize(partitionFileSlices))
    .flatMap((SerializableFunction<FileSlice, 
Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
{code}

 # *lookupRecords is called from commonForkJoinPool worker*
20% chance that Thread.currentThread().getContextClassLoader() returns the 
wrong classLoader
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197
{code:java}
inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
{code}

 # Class.forName in hadoop Configuration throws ClassNotFoundException
org.apache.hadoop.conf.Configuration:2362
{code:java}
clazz = Class.forName(name, true, classLoader);
{code}

 # FileIndex prints warning. Data skipping does not work
org.apache.hudi.source.FileIndex:272
{code:java}
} catch (Throwable throwable) {
  LOG.warn("Read column stats for data skipping error", throwable);
  return null;
{code}


> Flink data skipping doesn't work with ClassNotFoundException of 
> InLineFileSystem
> --------------------------------------------------------------------------------
>
>                 Key: HUDI-5151
>                 URL: https://issues.apache.org/jira/browse/HUDI-5151
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink, metadata
>            Reporter: Alexander Trushev
>            Assignee: Alexander Trushev
>            Priority: Major
>
> h3. The problem
> Flink data skipping doesn't work and warning occurs in log
> {code:java}
> 7799 [main] WARN  org.apache.hudi.source.FileIndex [] - Read column stats for 
> data skipping error
> org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieException: Error occurs when executing map
>       ...
> Caused by: java.lang.ClassNotFoundException: Class 
> org.apache.hudi.common.fs.inline.InLineFileSystem not found
>       at 
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2329) 
> ~[hadoop-common-2.10.1.jar:?]
>       ...
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
> ~[?:1.8.0_345]
> {code}
> h3. The solution
> This problem has already been fixed by 
> https://issues.apache.org/jira/browse/HUDI-3763
> But the patch doesn't fix flink's job
> We should use InLineFileSystem.class.getClassLoader() instead of 
> Thread.currentThread().getContextClassLoader() because method 
> lookupRecords(keys, fullKey) is called from commonForkJoinPool-worker thread 
> which may contain the wrong contextClassLoader
> h3. When the problem occurs
> It is an intermittent bug that is hard to reproduce. Below is the sequence of 
> events leading to the error. I can attach flink job if necessary.
>  # Flink runs job with miniCluster
>  # TemporaryClassLoaderContext consisting SubmoduleClassLoader is passed to 
> akka RpcSystem
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils:345
> {code:java}
> // akka internally caches the context class loader
> // make sure it uses the plugin class loader
> try (TemporaryClassLoaderContext ignored =
>         TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
> {code}
>  # *All akka threads have SubmoduleClassLoader as a contextClassLoader* in 
> thread local storage
>  # *An akka thread creates completableFuture*
> org.apache.flink.runtime.rpc.akka.SupervisorActor:206
> {code:java}
> return Patterns.ask(
>                 supervisor,
>                 createStartAkkaRpcActorMessage(propsFactory, endpointId),
>                 RpcUtils.INF_DURATION)
>         .toCompletableFuture()
>         .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
>         .join();
> {code}
>  # Creation of worker thread in commonForkJoinPool is triggered
> java.util.concurrent.ForkJoinPool:1485
> {code:java}
> try {
>     if (fac != null && (wt = fac.newThread(this)) != null) {
>         wt.start();
> {code}
>  # *commonForkJoinPool worker thread inherits SubmoduleClassLoader* as a 
> contextClassLoader due to it is created from akka thread
>  # The commonForkJoin worker creates another one. Now two workers contain the 
> wrong classLoader
>  # It is possible when commonForkJoinPool consists
> 2 workers with SubmoduleClassLoader
> 8 workers with AppClassLoader
>  # Hudi uses java parallel stream which performs its tasks using 
> commonForkJoinPool as well
> org.apache.hudi.metadata.HoodieBackedTableMetadata:160
> {code:java}
> return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : 
> engineContext.parallelize(partitionFileSlices))
>     .flatMap((SerializableFunction<FileSlice, 
> Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
> {code}
>  # *lookupRecords is called from commonForkJoinPool worker*
> 20% chance that Thread.currentThread().getContextClassLoader() returns the 
> wrong classLoader
> org.apache.hudi.common.table.log.block.HoodieHFileDataBlock:197
> {code:java}
> inlineConf.setClassLoader(Thread.currentThread().getContextClassLoader());
> {code}
>  # Class.forName in hadoop Configuration throws ClassNotFoundException
> org.apache.hadoop.conf.Configuration:2362
> {code:java}
> clazz = Class.forName(name, true, classLoader);
> {code}
>  # FileIndex prints warning. Data skipping does not work
> org.apache.hudi.source.FileIndex:272
> {code:java}
> } catch (Throwable throwable) {
>   LOG.warn("Read column stats for data skipping error", throwable);
>   return null;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to