[
https://issues.apache.org/jira/browse/HUDI-7307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-7307:
------------------------------
Fix Version/s: 1.0.2
> Record index lookup fails for long running Spark jobs on secured Yarn clusters
> ------------------------------------------------------------------------------
>
> Key: HUDI-7307
> URL: https://issues.apache.org/jira/browse/HUDI-7307
> Project: Apache Hudi
> Issue Type: Bug
> Components: metadata
> Affects Versions: 0.14.0, 0.14.1
> Environment: Java8, Spark 3.x, Hadoop 3.3.x
> Reporter: Zbigniew Baranowski
> Priority: Critical
> Labels: 0.14.0, 0.14.1_candidate
> Fix For: 1.0.2
>
>
> *What is the problem?*
> Writing to Hudi tables with RECORD_INDEX enabled by a Spark job on Hadoop
> secured clusters fails once the job reaches 24h of lifetime (HDFS delegation
> token is renewed and the old one is expired) with an error:
> {code:java}
> 24/01/17 09:55:19 WARN TaskSetManager: Lost task 0.0 in stage 104.0 (TID 244)
> (host1.apache.org executor 1): org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieException: Error occurs when executing map
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
> at
> java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
> at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
> at
> org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
> at
> org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
> at
> org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> at org.apache.spark.scheduler.Task.run(Task.scala:139)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.hudi.exception.HoodieException: Error occurs when
> executing map
> at
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
> at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
> at java.util.stream.AbstractTask.compute(AbstractTask.java:327)
> at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.hudi.exception.HoodieIOException: Failed to scan
> metadata
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:165)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:355)
> at
> org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1264)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
> at
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
> ... 12 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Token for real user: , can't be found in cache
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612)
> at org.apache.hadoop.ipc.Client.call(Client.java:1558)
> at org.apache.hadoop.ipc.Client.call(Client.java:1455)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
> at com.sun.proxy.$Proxy40.getListing(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:688)
> at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy41.getListing(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1702)
> at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1686)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1100)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:147)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1175)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1172)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1182)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597)
> at
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
> at
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:552)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:645)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:628)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163)
> ... 22 more
> {code}
> *Affected version:*
> Hudi 0.14.0 and 0.14.1
> Spark 3.4.1 - probably all others too
> *Expected behavior:*
> Since the delegation token is internally renewed by spark, the job should not
> fail and continue to write Hudi table without an error, the same way as is
> done before a token expiration
> *Steps to reproduce:*
> On a Kerberos-secured Hadoop cluster run the following snippet:
> {code:java}
> # start spark-shell in secure mode with keytab and short
> spark.security.credentials.renewalRatio period so we don't have to wait hours
> for dt renewal.
> spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1 \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
> --conf
> 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
> \
> --conf
> 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
> --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
> --conf spark.security.credentials.renewalRatio=0.001 \
> --num-executors 1 \
> --executor-memory 2g \
> --keytab user.keytab \
> --principal [email protected]
> # Comments:
> # dt renewed every 86sec
> # keytab is needed to do dt renewal
> # run the snipped within spark-shell
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.DataFrame
> // keep the login token because we will cancel it later
> val token =
> org.apache.hadoop.security.UserGroupInformation.getLoginUser.getCredentials.getToken(new
>
> org.apache.hadoop.io.Text(org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration).getCanonicalServiceName))
> // helper function to create a dataframe
> def getDataFrame(data: Seq[Row]): DataFrame = {
> // Define the schema for the DataFrame
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = false),
> StructField("name", StringType, nullable = true),
> StructField("age", IntegerType, nullable = true)
> ))
> // Create a DataFrame from the schema and data
> val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
> schema)
> df
> }
> // helper function to write df to hudi
> def write(dataFrame: DataFrame): Unit = {
> dataFrame.
> write.
> format("org.apache.hudi").
> option("hoodie.table.name", "test").
> option("hoodie.datasource.write.precombine.field", "age").
> option("hoodie.datasource.write.recordkey.field", "id").
> option("hoodie.datasource.write.partitionpath.field", "").
> option("hoodie.datasource.hive_sync.support_timestamp", "true").
> option("hoodie.datasource.hive_sync.enable", "false").
> option("hoodie.datasource.write.reconcile.schema", "true").
> option("hoodie.write.markers.type", "DIRECT").
> option("hoodie.schema.on.read.enable","true").
> option("hoodie.datasource.write.reconcile.schema","true").
> option("hoodie.metadata.enable","true").
> option("hoodie.index.type","RECORD_INDEX").
> option("hoodie.metadata.record.index.enable","true").
> option("hoodie.datasource.write.operation", "UPSERT").
>
> option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>
> option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor").
> mode(org.apache.spark.sql.SaveMode.Append).
> save("/tmp/hudi_test")
> }
> val data1 = Seq(
> Row(1, "Alice", 25),
> Row(2, "Bob", 30),
> Row(3, "Charlie", 22)
> )
> val df1 = getDataFrame(data1);
> val data2 = Seq(
> Row(1, "Alice", 26),
> Row(2, "Bob", 30),
> Row(3, "Charlie", 23)
> )
> val df2 = getDataFrame(data2);
> // write for the first time this will be successfull because the token is
> still valid
> write(df1)
> // wait for renewal so we can 'safely' cancel the prevoius token (wait time =
> day ms * spark.security.credentials.renewalRatio )
> java.lang.Thread.sleep((86400000 *
> spark.conf.get("spark.security.credentials.renewalRatio").toFloat).toLong)
> // cancel the token
> token.cancel(spark.sparkContext.hadoopConfiguration)
> //this should fail some tasks on executors and the whole job with Token for
> real user: , can't be found in cache
> write(df1)
> // if not run the write again
> write(df2)
> // and again - not all task may fail as this depends which thread is used for
> handling meta access
> write(df1)
> {code}
> *Root cause analysis:*
> Record Index lookup is done via HoodieBackedTableMetadata.getRecordsByKeys
> which uses HoodieLocalEngineContext object on Spark executors.
> HoodieLocalEngineContext is leveraging java.util.stream for input data
> processing in a parallel fashion (by calling parallel() in the pipeline).
> Parallelism in java.util.stream is implemented using ForkJoinPool executor
> pool.
> This pool once initialized for the first time reuses the same threads for any
> future call.
> This is the key to understanding why even if HDFS delegation token is renewed
> by spark, some actions executed in HoodieLocalEngineContext are still using
> the old/first token that spark executor was started with.
> ForkJoinPool seems to be initialized at the beginning of the spark executor
> lifetime, outside of the Java Access Control Context privilege action (not
> within UserGroupInformation.doAs call). Therefore it only has login user
> credentials/tokens read from token files available to each YARN executor
> during startup - these are valid for the next 24h, so all will be fine for
> the next 24h.
> Normal Spark task thread pool is expected within Java Access Control Context
> privilege action(as a part of doAs):
> [https://github.com/apache/spark/blob/4a69ce68172dc4bae230e933f30081414ddb50cd/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L413C1-L485C4]).
> So in single-threaded execution of a spark task, the thread has the access
> to tokens available within Java Access Control Context (returned by
> UserGroupInformation.getCurrentUser()).
> Finally, the current state of the Spark art makes Spark renew the tokens only
> within Java Security Context:
> [https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L138],
> so those outside of the Java Access Control Context are unchanged - this
> applies to ForkJoinPool
> To conclude, because of the way how Spark renews the delegation token on
> executors, using parallel pipelines of java.util.stream in a code delegated
> to executors is +unsafe+ and will cause a failure once the current token has
> expired. So in general using of HoodieLocalEngineContext is not delegation
> token safe.
> Worth noting here that the delegation token business applies only to the
> YARN-secured cluster
> *Potential solutions/workarounds:*
> The following attempts have been tried and they are confirmed to be solving
> the problem:
> 1) Remove all calls to parallel() on java.util.stream in functions of
> HoodieLocalEngineContext. For index lookup fixing only removal in map
> function is needed
> 2) Populate new credentials/token to ForkJoinPool threads by updating login
> user credentials:
> {code:java}
> diff --git
> a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
>
> b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> index 5239490816d..ca78e6d7529 100644
> ---
> a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> +++
> b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> @@ -18,6 +18,10 @@
>
> package org.apache.hudi.common.engine;
>
> +import org.apache.hadoop.security.Credentials;
> +import org.apache.hadoop.security.UserGroupInformation;
> +import org.apache.hadoop.security.token.Token;
> +import org.apache.hadoop.security.token.TokenIdentifier;
> import org.apache.hudi.common.config.SerializableConfiguration;
> import org.apache.hudi.common.data.HoodieAccumulator;
> import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
> @@ -34,7 +38,11 @@ import
> org.apache.hudi.common.util.collection.ImmutablePair;
> import org.apache.hudi.common.util.collection.Pair;
>
> import org.apache.hadoop.conf.Configuration;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> +import java.io.IOException;
> +import java.util.Collection;
> import java.util.Collections;
> import java.util.Iterator;
> import java.util.List;
> @@ -56,8 +64,17 @@ import static
> org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrap
> */
> public final class HoodieLocalEngineContext extends HoodieEngineContext {
>
> + private static final Logger LOG =
> LoggerFactory.getLogger(HoodieLocalEngineContext.class);
> +
> public HoodieLocalEngineContext(Configuration conf) {
> this(conf, new LocalTaskContextSupplier());
> + try {
> + LOG.debug("Creating HoodieLocalEngineContext ugi tokens: "
> + +
> UserGroupInformation.getCurrentUser().getCredentials().getAllTokens() + "
> config="
> + + conf.toString());
> + } catch (IOException e) {
> + throw new RuntimeException(e);
> + }
> }
>
> public HoodieLocalEngineContext(Configuration conf, TaskContextSupplier
> taskContextSupplier) {
> @@ -81,12 +98,15 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
>
> @Override
> public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func,
> int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
> + //return data.stream().map(throwingMapWrapper(func)).collect(toList());
> }
>
> @Override
> public <I, K, V> List<V> mapToPairAndReduceByKey(
> List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc,
> SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
> .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
> .map(list -> list.stream().map(e ->
> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
> @@ -97,6 +117,7 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
> public <I, K, V> Stream<ImmutablePair<K, V>>
> mapPartitionsToPairAndReduceByKey(
> Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
> flatMapToPairFunc,
> SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
> .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
> .map(entry -> new ImmutablePair<>(entry.getKey(),
> entry.getValue().stream().map(
> @@ -107,6 +128,7 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
> @Override
> public <I, K, V> List<V> reduceByKey(
> List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
> parallelism) {
> + checkAndPropagateTokensRenewal();
> return data.stream().parallel()
> .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
> .map(list -> list.stream().map(e ->
> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
> @@ -116,6 +138,7 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
>
> @Override
> public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
> Stream<O>> func, int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
> }
>
> @@ -170,4 +193,42 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
> public void cancelAllJobs() {
> // no operation for now
> }
> +
> + private void checkAndPropagateTokensRenewal() {
> + if (!UserGroupInformation.isSecurityEnabled()) {
> + return;
> + }
> +
> + try {
> + UserGroupInformation login = UserGroupInformation.getLoginUser();
> + UserGroupInformation current = UserGroupInformation.getCurrentUser();
> + LOG.debug("Login tokens:" + login.getCredentials().getAllTokens());
> + LOG.debug("Current tokens:" + current.getCredentials().getAllTokens());
> +
> + if (!compareCredentials(current.getCredentials(),
> login.getCredentials())) {
> + LOG.debug("Login and current are different, updating the tokens");
> + login.addCredentials(current.getCredentials());
> + LOG.debug(("Updated login tokens: " +
> login.getCredentials().getAllTokens()));
> + }
> + } catch (IOException e) {
> + throw new RuntimeException(e);
> + }
> + }
> +
> + private static boolean compareCredentials(Credentials credentials1,
> Credentials credentials2) {
> + // Get all the tokens from the first Credentials instance
> + Collection<Token<? extends TokenIdentifier>> tokens1 =
> credentials1.getAllTokens();
> +
> + // Get all the tokens from the second Credentials instance
> + Collection<Token<? extends TokenIdentifier>> tokens2 =
> credentials2.getAllTokens();
> +
> + // Check if the number of tokens is the same
> + if (tokens1.size() != tokens2.size()) {
> + return false;
> + }
> +
> +
> + // All tokens are the same
> + return tokens2.containsAll(tokens1);
> + }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)