[ 
https://issues.apache.org/jira/browse/HUDI-7307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Y Ethan Guo updated HUDI-7307:
------------------------------
    Fix Version/s: 1.0.2

> Record index lookup fails for long running Spark jobs on secured Yarn clusters
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-7307
>                 URL: https://issues.apache.org/jira/browse/HUDI-7307
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: metadata
>    Affects Versions: 0.14.0, 0.14.1
>         Environment: Java8, Spark 3.x, Hadoop 3.3.x
>            Reporter: Zbigniew Baranowski
>            Priority: Critical
>              Labels: 0.14.0, 0.14.1_candidate
>             Fix For: 1.0.2
>
>
> *What is the problem?*
> Writing to Hudi tables with RECORD_INDEX enabled by a Spark job on Hadoop 
> secured clusters fails once the job reaches 24h of lifetime (HDFS delegation 
> token is renewed and the old one is expired) with an error:
> {code:java}
> 24/01/17 09:55:19 WARN TaskSetManager: Lost task 0.0 in stage 104.0 (TID 244) 
> (host1.apache.org executor 1): org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieException: Error occurs when executing map
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at 
> java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
>       at 
> java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
>       at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
>       at 
> java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
>       at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
>       at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>       at 
> org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
>       at 
> org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
>       at 
> org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
>       at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
>       at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
>       at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>       at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>       at org.apache.spark.scheduler.Task.run(Task.scala:139)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.hudi.exception.HoodieException: Error occurs when 
> executing map
>       at 
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
>       at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>       at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>       at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>       at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>       at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
>       at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
>       at java.util.stream.AbstractTask.compute(AbstractTask.java:327)
>       at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.hudi.exception.HoodieIOException: Failed to scan 
> metadata
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:165)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175)
>       at 
> org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:355)
>       at 
> org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1264)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
>       at 
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
>       ... 12 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>  Token for real user: , can't be found in cache
>       at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1558)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1455)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
>       at com.sun.proxy.$Proxy40.getListing(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:688)
>       at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
>       at com.sun.proxy.$Proxy41.getListing(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1702)
>       at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1686)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1100)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:147)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1175)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1172)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1182)
>       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
>       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
>       at 
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597)
>       at 
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
>       at 
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596)
>       at 
> org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:552)
>       at 
> org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:645)
>       at 
> org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:628)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163)
>       ... 22 more
> {code}
> *Affected version:*
> Hudi 0.14.0 and 0.14.1
> Spark 3.4.1 - probably all others too
> *Expected behavior:*
> Since the delegation token is internally renewed by spark, the job should not 
> fail and continue to write Hudi table without an error, the same way as is 
> done before a token expiration
> *Steps to reproduce:*
> On a Kerberos-secured Hadoop cluster run the following snippet:
> {code:java}
> # start spark-shell in secure mode with keytab and short 
> spark.security.credentials.renewalRatio period so we don't have to wait hours 
> for dt renewal.
> spark-shell   --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1 \
>    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
>    --conf 
> 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
>  \
>    --conf 
> 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
>    --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
>    --conf spark.security.credentials.renewalRatio=0.001 \
>    --num-executors 1 \
>    --executor-memory 2g \
>    --keytab user.keytab \
>    --principal [email protected]
> # Comments:
> # dt renewed every 86sec
> # keytab is needed to do dt renewal
> # run the snipped within spark-shell
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.DataFrame
> // keep the login token because we will cancel it later
> val token = 
> org.apache.hadoop.security.UserGroupInformation.getLoginUser.getCredentials.getToken(new
>  
> org.apache.hadoop.io.Text(org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration).getCanonicalServiceName))
> // helper function to create a dataframe
> def getDataFrame(data: Seq[Row]): DataFrame = {
>       // Define the schema for the DataFrame
>       val schema = StructType(Seq(
>         StructField("id", IntegerType, nullable = false),
>         StructField("name", StringType, nullable = true),
>         StructField("age", IntegerType, nullable = true)
>       ))
>       // Create a DataFrame from the schema and data
>       val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
> schema)
>       df
> }
> // helper function to write df to hudi
> def write(dataFrame: DataFrame): Unit = {
>   dataFrame.
>   write.
>   format("org.apache.hudi").
>   option("hoodie.table.name", "test").
>   option("hoodie.datasource.write.precombine.field", "age").
>   option("hoodie.datasource.write.recordkey.field", "id").
>   option("hoodie.datasource.write.partitionpath.field", "").
>   option("hoodie.datasource.hive_sync.support_timestamp", "true").
>   option("hoodie.datasource.hive_sync.enable", "false").
>   option("hoodie.datasource.write.reconcile.schema", "true").
>   option("hoodie.write.markers.type", "DIRECT").
>   option("hoodie.schema.on.read.enable","true").
>   option("hoodie.datasource.write.reconcile.schema","true").
>   option("hoodie.metadata.enable","true").
>   option("hoodie.index.type","RECORD_INDEX").
>   option("hoodie.metadata.record.index.enable","true").
>   option("hoodie.datasource.write.operation", "UPSERT").
>   
> option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>   
> option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor").
>   mode(org.apache.spark.sql.SaveMode.Append).
>   save("/tmp/hudi_test")
> }
> val data1 = Seq(
>   Row(1, "Alice", 25),
>   Row(2, "Bob", 30),
>   Row(3, "Charlie", 22)
> )
> val df1 = getDataFrame(data1);
> val data2 = Seq(
>   Row(1, "Alice", 26),
>   Row(2, "Bob", 30),
>   Row(3, "Charlie", 23)
> )
> val df2 = getDataFrame(data2);
> // write for the first time this will be successfull because the token is 
> still valid
> write(df1)
> // wait for renewal so we can 'safely' cancel the prevoius token (wait time = 
> day ms * spark.security.credentials.renewalRatio )
> java.lang.Thread.sleep((86400000 * 
> spark.conf.get("spark.security.credentials.renewalRatio").toFloat).toLong)
> // cancel the token
> token.cancel(spark.sparkContext.hadoopConfiguration)
> //this should fail some tasks on executors and the whole job with Token for 
> real user: , can't be found in cache
> write(df1)
> // if not run the write again
> write(df2)
> // and again - not all task may fail as this depends which thread is used for 
> handling meta access
> write(df1)
> {code}
> *Root cause analysis:*
> Record Index lookup is done via HoodieBackedTableMetadata.getRecordsByKeys 
> which uses HoodieLocalEngineContext object on Spark executors. 
> HoodieLocalEngineContext is leveraging java.util.stream for input data 
> processing in a parallel fashion (by calling parallel() in the pipeline). 
> Parallelism in java.util.stream is implemented using ForkJoinPool executor 
> pool. 
> This pool once initialized for the first time reuses the same threads for any 
> future call. 
> This is the key to understanding why even if HDFS delegation token is renewed 
> by spark, some actions executed in HoodieLocalEngineContext are still using 
> the old/first token that spark executor was started with. 
> ForkJoinPool seems to be initialized at the beginning of the spark executor 
> lifetime, outside of the Java Access Control Context privilege action (not 
> within UserGroupInformation.doAs call). Therefore it only has login user 
> credentials/tokens read from token files available to each YARN executor 
> during startup - these are valid for the next 24h, so all will be fine for 
> the next 24h.
> Normal Spark task thread pool is expected within Java Access Control Context 
> privilege action(as a part of doAs): 
> [https://github.com/apache/spark/blob/4a69ce68172dc4bae230e933f30081414ddb50cd/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L413C1-L485C4]).
>  So in single-threaded execution of a spark task, the thread has the access 
> to tokens available within Java Access Control Context (returned by 
> UserGroupInformation.getCurrentUser()).
> Finally, the current state of the Spark art makes Spark renew the tokens only 
> within Java Security Context: 
> [https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L138],
>  so those outside of the Java Access Control Context are unchanged - this 
> applies to ForkJoinPool
> To conclude, because of the way how Spark renews the delegation token on 
> executors, using parallel pipelines of java.util.stream in a code delegated 
> to executors is +unsafe+ and will cause a failure once the current token has 
> expired. So in general using of HoodieLocalEngineContext is not delegation 
> token safe.
> Worth noting here that the delegation token business applies only to the 
> YARN-secured cluster
> *Potential solutions/workarounds:*
> The following attempts have been tried and they are confirmed to be solving 
> the problem:
> 1) Remove all calls to parallel() on java.util.stream in functions of 
> HoodieLocalEngineContext. For index lookup fixing only removal in map 
> function is needed
> 2) Populate new credentials/token to ForkJoinPool threads by updating login 
> user credentials:
> {code:java}
> diff --git 
> a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
>  
> b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> index 5239490816d..ca78e6d7529 100644
> --- 
> a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> +++ 
> b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> @@ -18,6 +18,10 @@
>  
>  package org.apache.hudi.common.engine;
>  
> +import org.apache.hadoop.security.Credentials;
> +import org.apache.hadoop.security.UserGroupInformation;
> +import org.apache.hadoop.security.token.Token;
> +import org.apache.hadoop.security.token.TokenIdentifier;
>  import org.apache.hudi.common.config.SerializableConfiguration;
>  import org.apache.hudi.common.data.HoodieAccumulator;
>  import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
> @@ -34,7 +38,11 @@ import 
> org.apache.hudi.common.util.collection.ImmutablePair;
>  import org.apache.hudi.common.util.collection.Pair;
>  
>  import org.apache.hadoop.conf.Configuration;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>  
> +import java.io.IOException;
> +import java.util.Collection;
>  import java.util.Collections;
>  import java.util.Iterator;
>  import java.util.List;
> @@ -56,8 +64,17 @@ import static 
> org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrap
>   */
>  public final class HoodieLocalEngineContext extends HoodieEngineContext {
>  
> +  private static final Logger LOG = 
> LoggerFactory.getLogger(HoodieLocalEngineContext.class);
> +
>    public HoodieLocalEngineContext(Configuration conf) {
>      this(conf, new LocalTaskContextSupplier());
> +    try {
> +      LOG.debug("Creating HoodieLocalEngineContext ugi tokens: "
> +          + 
> UserGroupInformation.getCurrentUser().getCredentials().getAllTokens() + " 
> config="
> +          + conf.toString());
> +    } catch (IOException e) {
> +      throw new RuntimeException(e);
> +    }
>    }
>  
>    public HoodieLocalEngineContext(Configuration conf, TaskContextSupplier 
> taskContextSupplier) {
> @@ -81,12 +98,15 @@ public final class HoodieLocalEngineContext extends 
> HoodieEngineContext {
>  
>    @Override
>    public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, 
> int parallelism) {
> +    checkAndPropagateTokensRenewal();
>      return 
> data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
> +    //return data.stream().map(throwingMapWrapper(func)).collect(toList());
>    }
>  
>    @Override
>    public <I, K, V> List<V> mapToPairAndReduceByKey(
>        List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, 
> SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
> +    checkAndPropagateTokensRenewal();
>      return 
> data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
>          .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
>          .map(list -> list.stream().map(e -> 
> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
> @@ -97,6 +117,7 @@ public final class HoodieLocalEngineContext extends 
> HoodieEngineContext {
>    public <I, K, V> Stream<ImmutablePair<K, V>> 
> mapPartitionsToPairAndReduceByKey(
>        Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
> flatMapToPairFunc,
>        SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
> +    checkAndPropagateTokensRenewal();
>      return 
> throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
>          .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
>          .map(entry -> new ImmutablePair<>(entry.getKey(), 
> entry.getValue().stream().map(
> @@ -107,6 +128,7 @@ public final class HoodieLocalEngineContext extends 
> HoodieEngineContext {
>    @Override
>    public <I, K, V> List<V> reduceByKey(
>        List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int 
> parallelism) {
> +    checkAndPropagateTokensRenewal();
>      return data.stream().parallel()
>          .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
>          .map(list -> list.stream().map(e -> 
> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
> @@ -116,6 +138,7 @@ public final class HoodieLocalEngineContext extends 
> HoodieEngineContext {
>  
>    @Override
>    public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, 
> Stream<O>> func, int parallelism) {
> +    checkAndPropagateTokensRenewal();
>      return 
> data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
>    }
>  
> @@ -170,4 +193,42 @@ public final class HoodieLocalEngineContext extends 
> HoodieEngineContext {
>    public void cancelAllJobs() {
>      // no operation for now
>    }
> +
> +  private void checkAndPropagateTokensRenewal() {
> +    if (!UserGroupInformation.isSecurityEnabled()) {
> +      return;
> +    }
> +
> +    try {
> +      UserGroupInformation login = UserGroupInformation.getLoginUser();
> +      UserGroupInformation current = UserGroupInformation.getCurrentUser();
> +      LOG.debug("Login tokens:" + login.getCredentials().getAllTokens());
> +      LOG.debug("Current tokens:" + current.getCredentials().getAllTokens());
> +
> +      if (!compareCredentials(current.getCredentials(), 
> login.getCredentials())) {
> +        LOG.debug("Login and current are different, updating the tokens");
> +        login.addCredentials(current.getCredentials());
> +        LOG.debug(("Updated login tokens: " + 
> login.getCredentials().getAllTokens()));
> +      }
> +    } catch (IOException e) {
> +      throw new RuntimeException(e);
> +    }
> +  }
> +
> +  private static boolean compareCredentials(Credentials credentials1, 
> Credentials credentials2) {
> +    // Get all the tokens from the first Credentials instance
> +    Collection<Token<? extends TokenIdentifier>> tokens1 = 
> credentials1.getAllTokens();
> +
> +    // Get all the tokens from the second Credentials instance
> +    Collection<Token<? extends TokenIdentifier>> tokens2 = 
> credentials2.getAllTokens();
> +
> +    // Check if the number of tokens is the same
> +    if (tokens1.size() != tokens2.size()) {
> +      return false;
> +    }
> +
> +
> +    // All tokens are the same
> +    return tokens2.containsAll(tokens1);
> +  }
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to