[
https://issues.apache.org/jira/browse/HUDI-7307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zbigniew Baranowski updated HUDI-7307:
--------------------------------------
Description:
*What is the problem?*
Writing to Hudi tables with RECORD_INDEX enabled by a Spark job on Hadoop
secured clusters fails once the job reaches 24h of lifetime (HDFS delegation
token is renewed and the old one is expired) with an error:
{code:java}
24/01/17 09:55:19 WARN TaskSetManager: Lost task 0.0 in stage 104.0 (TID 244)
(host1.apache.org executor 1): org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: Error occurs when executing map
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at
java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at
java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
at
org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
at
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when
executing map
at
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
at java.util.stream.AbstractTask.compute(AbstractTask.java:327)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.hudi.exception.HoodieIOException: Failed to scan metadata
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:165)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175)
at
org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:355)
at
org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1264)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
at
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 12 more
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
Token for real user: , can't be found in cache
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612)
at org.apache.hadoop.ipc.Client.call(Client.java:1558)
at org.apache.hadoop.ipc.Client.call(Client.java:1455)
at
org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
at
org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
at com.sun.proxy.$Proxy40.getListing(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:688)
at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy41.getListing(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1702)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1686)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1100)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:147)
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1175)
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1172)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1182)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:552)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:645)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:628)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163)
... 22 more
{code}
*Affected version:*
Hudi 0.14.0 and 0.14.1
Spark 3.4.1 - probably all others too
*Expected behavior:*
Since the delegation token is internally renewed by spark, the job should not
fail and continue to write Hudi table without an error, the same way as is done
before a token expiration
*Steps to reproduce:*
On a Kerberos-secured Hadoop cluster run the following snippet:
{code:java}
# start spark-shell in secure mode with keytab and short
spark.security.credentials.renewalRatio period so we don't have to wait hours
for dt renewal.
spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
--conf spark.security.credentials.renewalRatio=0.001 \
--num-executors 1 \
--executor-memory 2g \
--keytab user.keytab \
--principal [email protected]
# Comments:
# dt renewed every 86sec
# keytab is needed to do dt renewal
# run the snipped within spark-shell
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
// keep the login token because we will cancel it later
val token =
org.apache.hadoop.security.UserGroupInformation.getLoginUser.getCredentials.getToken(new
org.apache.hadoop.io.Text(org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration).getCanonicalServiceName))
// helper function to create a dataframe
def getDataFrame(data: Seq[Row]): DataFrame = {
// Define the schema for the DataFrame
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
))
// Create a DataFrame from the schema and data
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
df
}
// helper function to write df to hudi
def write(dataFrame: DataFrame): Unit = {
dataFrame.
write.
format("org.apache.hudi").
option("hoodie.table.name", "test").
option("hoodie.datasource.write.precombine.field", "age").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.partitionpath.field", "").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option("hoodie.datasource.hive_sync.enable", "false").
option("hoodie.datasource.write.reconcile.schema", "true").
option("hoodie.write.markers.type", "DIRECT").
option("hoodie.schema.on.read.enable","true").
option("hoodie.datasource.write.reconcile.schema","true").
option("hoodie.metadata.enable","true").
option("hoodie.index.type","RECORD_INDEX").
option("hoodie.metadata.record.index.enable","true").
option("hoodie.datasource.write.operation", "UPSERT").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor").
mode(org.apache.spark.sql.SaveMode.Append).
save("/tmp/hudi_test")
}
val data1 = Seq(
Row(1, "Alice", 25),
Row(2, "Bob", 30),
Row(3, "Charlie", 22)
)
val df1 = getDataFrame(data1);
val data2 = Seq(
Row(1, "Alice", 26),
Row(2, "Bob", 30),
Row(3, "Charlie", 23)
)
val df2 = getDataFrame(data2);
// write for the first time this will be successfull because the token is still
valid
write(df1)
// wait for renewal so we can 'safely' cancel the prevoius token (wait time =
day ms * spark.security.credentials.renewalRatio )
java.lang.Thread.sleep((86400000 *
spark.conf.get("spark.security.credentials.renewalRatio").toFloat).toLong)
// cancel the token
token.cancel(spark.sparkContext.hadoopConfiguration)
//this should fail some tasks on executors and the whole job with Token for
real user: , can't be found in cache
write(df1)
// if not run the write again
write(df2)
// and again - not all task may fail as this depends which thread is used for
handling meta access
write(df1)
{code}
*Root cause analysis:*
Record Index lookup is done via HoodieBackedTableMetadata.getRecordsByKeys
which uses HoodieLocalEngineContext object on Spark executors.
HoodieLocalEngineContext is leveraging java.util.stream for input data
processing in a parallel fashion (by calling parallel() in the pipeline).
Parallelism in java.util.stream is implemented using ForkJoinPool executor
pool.
This pool once initialized for the first time reuses the same threads for any
future call.
This is the key to understanding why even if HDFS delegation token is renewed
by spark, some actions executed in HoodieLocalEngineContext are still using the
old/first token that spark executor was started with.
ForkJoinPool seems to be initialized at the beginning of the spark executor
lifetime, outside of the Java Access Control Context privilege action (not
within UserGroupInformation.doAs call). Therefore it only has login user
credentials/tokens read from token files available to each YARN executor during
startup - these are valid for the next 24h, so all will be fine for the next
24h.
Normal Spark task thread pool is expected within Java Access Control Context
privilege action(as a part of doAs):
[https://github.com/apache/spark/blob/4a69ce68172dc4bae230e933f30081414ddb50cd/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L413C1-L485C4]).
So in single-threaded execution of a spark task, the thread has the access to
tokens available within Java Access Control Context (returned by
UserGroupInformation.getCurrentUser()).
Finally, the current state of the Spark art makes Spark renew the tokens only
within Java Security Context:
[https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L138],
so those outside of the Java Access Control Context are unchanged - this
applies to ForkJoinPool
To conclude, because of the way how Spark renews the delegation token on
executors, using parallel pipelines of java.util.stream in a code delegated to
executors is +unsafe+ and will cause a failure once the current token has
expired. So in general using of HoodieLocalEngineContext is not delegation
token safe.
Worth noting here that the delegation token business applies only to the
YARN-secured cluster
*Potential solutions/workarounds:*
The following attempts have been tried and they are confirmed to be solving the
problem:
1) Remove all calls to parallel() on java.util.stream in functions of
HoodieLocalEngineContext. For index lookup fixing only removal in map function
is needed
2) Populate new credentials/token to ForkJoinPool threads by updating login
user credentials:
{code:java}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 5239490816d..ca78e6d7529 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -18,6 +18,10 @@
package org.apache.hudi.common.engine;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
@@ -34,7 +38,11 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -56,8 +64,17 @@ import static
org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrap
*/
public final class HoodieLocalEngineContext extends HoodieEngineContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
public HoodieLocalEngineContext(Configuration conf) {
this(conf, new LocalTaskContextSupplier());
+ try {
+ LOG.debug("Creating HoodieLocalEngineContext ugi tokens: "
+ +
UserGroupInformation.getCurrentUser().getCredentials().getAllTokens() + "
config="
+ + conf.toString());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
public HoodieLocalEngineContext(Configuration conf, TaskContextSupplier
taskContextSupplier) {
@@ -81,12 +98,15 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int
parallelism) {
+ checkAndPropagateTokensRenewal();
return
data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
+ //return data.stream().map(throwingMapWrapper(func)).collect(toList());
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ checkAndPropagateTokensRenewal();
return
data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list -> list.stream().map(e ->
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
@@ -97,6 +117,7 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ checkAndPropagateTokensRenewal();
return
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
.map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
@@ -107,6 +128,7 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism) {
+ checkAndPropagateTokensRenewal();
return data.stream().parallel()
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list -> list.stream().map(e ->
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
@@ -116,6 +138,7 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
+ checkAndPropagateTokensRenewal();
return
data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
}
@@ -170,4 +193,42 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
public void cancelAllJobs() {
// no operation for now
}
+
+ private void checkAndPropagateTokensRenewal() {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ try {
+ UserGroupInformation login = UserGroupInformation.getLoginUser();
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ LOG.debug("Login tokens:" + login.getCredentials().getAllTokens());
+ LOG.debug("Current tokens:" + current.getCredentials().getAllTokens());
+
+ if (!compareCredentials(current.getCredentials(),
login.getCredentials())) {
+ LOG.debug("Login and current are different, updating the tokens");
+ login.addCredentials(current.getCredentials());
+ LOG.debug(("Updated login tokens: " +
login.getCredentials().getAllTokens()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static boolean compareCredentials(Credentials credentials1,
Credentials credentials2) {
+ // Get all the tokens from the first Credentials instance
+ Collection<Token<? extends TokenIdentifier>> tokens1 =
credentials1.getAllTokens();
+
+ // Get all the tokens from the second Credentials instance
+ Collection<Token<? extends TokenIdentifier>> tokens2 =
credentials2.getAllTokens();
+
+ // Check if the number of tokens is the same
+ if (tokens1.size() != tokens2.size()) {
+ return false;
+ }
+
+
+ // All tokens are the same
+ return tokens2.containsAll(tokens1);
+ }
}
{code}
was:
*What is the problem?*
Writing to Hudi tables with RECORD_INDEX enabled by a Spark job on Hadoop
secured clusters fails once the job reaches 24h of lifetime (HDFS delegation
token is renewed and the old one is expired) with an error:
{code:java}
24/01/17 09:55:19 WARN TaskSetManager: Lost task 0.0 in stage 104.0 (TID 244)
(bgswrkr02-rci.bigeys.priv executor 1):
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: Error occurs when executing map
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at
java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at
java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
at
org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
at
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when
executing map
at
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
at java.util.stream.AbstractTask.compute(AbstractTask.java:327)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.hudi.exception.HoodieIOException: Failed to scan metadata
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:165)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175)
at
org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:355)
at
org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1264)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
at
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 12 more
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
Token for real user: , can't be found in cache
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612)
at org.apache.hadoop.ipc.Client.call(Client.java:1558)
at org.apache.hadoop.ipc.Client.call(Client.java:1455)
at
org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
at
org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
at com.sun.proxy.$Proxy40.getListing(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:688)
at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy41.getListing(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1702)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1686)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1100)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:147)
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1175)
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1172)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1182)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:552)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:645)
at
org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:628)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163)
... 22 more
{code}
*Affected version:*
Hudi 0.14.0 and 0.14.1
Spark 3.4.1 - probably all others too
*Expected behavior:*
Since the delegation token is internally renewed by spark, the job should not
fail and continue to write Hudi table without an error, the same way as is done
before a token expiration
*Steps to reproduce:*
On a Kerberos-secured Hadoop cluster run the following snippet:
{code:java}
# start spark-shell in secure mode with keytab and short
spark.security.credentials.renewalRatio period so we don't have to wait hours
for dt renewal.
spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
--conf spark.security.credentials.renewalRatio=0.001 \
--num-executors 1 \
--executor-memory 2g \
--keytab user.keytab \
--principal [email protected]
# Comments:
# dt renewed every 86sec
# keytab is needed to do dt renewal
# run the snipped within spark-shell
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
// keep the login token because we will cancel it later
val token =
org.apache.hadoop.security.UserGroupInformation.getLoginUser.getCredentials.getToken(new
org.apache.hadoop.io.Text(org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration).getCanonicalServiceName))
// helper function to create a dataframe
def getDataFrame(data: Seq[Row]): DataFrame = {
// Define the schema for the DataFrame
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
))
// Create a DataFrame from the schema and data
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
df
}
// helper function to write df to hudi
def write(dataFrame: DataFrame): Unit = {
dataFrame.
write.
format("org.apache.hudi").
option("hoodie.table.name", "test").
option("hoodie.datasource.write.precombine.field", "age").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.partitionpath.field", "").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option("hoodie.datasource.hive_sync.enable", "false").
option("hoodie.datasource.write.reconcile.schema", "true").
option("hoodie.write.markers.type", "DIRECT").
option("hoodie.schema.on.read.enable","true").
option("hoodie.datasource.write.reconcile.schema","true").
option("hoodie.metadata.enable","true").
option("hoodie.index.type","RECORD_INDEX").
option("hoodie.metadata.record.index.enable","true").
option("hoodie.datasource.write.operation", "UPSERT").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor").
mode(org.apache.spark.sql.SaveMode.Append).
save("/tmp/hudi_test")
}
val data1 = Seq(
Row(1, "Alice", 25),
Row(2, "Bob", 30),
Row(3, "Charlie", 22)
)
val df1 = getDataFrame(data1);
val data2 = Seq(
Row(1, "Alice", 26),
Row(2, "Bob", 30),
Row(3, "Charlie", 23)
)
val df2 = getDataFrame(data2);
// write for the first time this will be successfull because the token is still
valid
write(df1)
// wait for renewal so we can 'safely' cancel the prevoius token (wait time =
day ms * spark.security.credentials.renewalRatio )
java.lang.Thread.sleep((86400000 *
spark.conf.get("spark.security.credentials.renewalRatio").toFloat).toLong)
// cancel the token
token.cancel(spark.sparkContext.hadoopConfiguration)
//this should fail some tasks on executors and the whole job with Token for
real user: , can't be found in cache
write(df1)
// if not run the write again
write(df2)
// and again - not all task may fail as this depends which thread is used for
handling meta access
write(df1)
{code}
*Root cause analysis:*
Record Index lookup is done via HoodieBackedTableMetadata.getRecordsByKeys
which uses HoodieLocalEngineContext object on Spark executors.
HoodieLocalEngineContext is leveraging java.util.stream for input data
processing in a parallel fashion (by calling parallel() in the pipeline).
Parallelism in java.util.stream is implemented using ForkJoinPool executor
pool.
This pool once initialized for the first time reuses the same threads for any
future call.
This is the key to understanding why even if HDFS delegation token is renewed
by spark, some actions executed in HoodieLocalEngineContext are still using the
old/first token that spark executor was started with.
ForkJoinPool seems to be initialized at the beginning of the spark executor
lifetime, outside of the Java Access Control Context privilege action (not
within UserGroupInformation.doAs call). Therefore it only has login user
credentials/tokens read from token files available to each YARN executor during
startup - these are valid for the next 24h, so all will be fine for the next
24h.
Normal Spark task thread pool is expected within Java Access Control Context
privilege action(as a part of doAs):
[https://github.com/apache/spark/blob/4a69ce68172dc4bae230e933f30081414ddb50cd/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L413C1-L485C4]).
So in single-threaded execution of a spark task, the thread has the access to
tokens available within Java Access Control Context (returned by
UserGroupInformation.getCurrentUser()).
Finally, the current state of the Spark art makes Spark renew the tokens only
within Java Security Context:
[https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L138],
so those outside of the Java Access Control Context are unchanged - this
applies to ForkJoinPool
To conclude, because of the way how Spark renews the delegation token on
executors, using parallel pipelines of java.util.stream in a code delegated to
executors is +unsafe+ and will cause a failure once the current token has
expired. So in general using of HoodieLocalEngineContext is not delegation
token safe.
Worth noting here that the delegation token business applies only to the
YARN-secured cluster
*Potential solutions/workarounds:*
The following attempts have been tried and they are confirmed to be solving the
problem:
1) Remove all calls to parallel() on java.util.stream in functions of
HoodieLocalEngineContext. For index lookup fixing only removal in map function
is needed
2) Populate new credentials/token to ForkJoinPool threads by updating login
user credentials:
{code:java}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 5239490816d..ca78e6d7529 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -18,6 +18,10 @@
package org.apache.hudi.common.engine;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
@@ -34,7 +38,11 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -56,8 +64,17 @@ import static
org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrap
*/
public final class HoodieLocalEngineContext extends HoodieEngineContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLocalEngineContext.class);
+
public HoodieLocalEngineContext(Configuration conf) {
this(conf, new LocalTaskContextSupplier());
+ try {
+ LOG.debug("Creating HoodieLocalEngineContext ugi tokens: "
+ +
UserGroupInformation.getCurrentUser().getCredentials().getAllTokens() + "
config="
+ + conf.toString());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
public HoodieLocalEngineContext(Configuration conf, TaskContextSupplier
taskContextSupplier) {
@@ -81,12 +98,15 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int
parallelism) {
+ checkAndPropagateTokensRenewal();
return
data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
+ //return data.stream().map(throwingMapWrapper(func)).collect(toList());
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ checkAndPropagateTokensRenewal();
return
data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list -> list.stream().map(e ->
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
@@ -97,6 +117,7 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
public <I, K, V> Stream<ImmutablePair<K, V>>
mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+ checkAndPropagateTokensRenewal();
return
throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
.map(entry -> new ImmutablePair<>(entry.getKey(),
entry.getValue().stream().map(
@@ -107,6 +128,7 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism) {
+ checkAndPropagateTokensRenewal();
return data.stream().parallel()
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list -> list.stream().map(e ->
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
@@ -116,6 +138,7 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
+ checkAndPropagateTokensRenewal();
return
data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
}
@@ -170,4 +193,42 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
public void cancelAllJobs() {
// no operation for now
}
+
+ private void checkAndPropagateTokensRenewal() {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ try {
+ UserGroupInformation login = UserGroupInformation.getLoginUser();
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ LOG.debug("Login tokens:" + login.getCredentials().getAllTokens());
+ LOG.debug("Current tokens:" + current.getCredentials().getAllTokens());
+
+ if (!compareCredentials(current.getCredentials(),
login.getCredentials())) {
+ LOG.debug("Login and current are different, updating the tokens");
+ login.addCredentials(current.getCredentials());
+ LOG.debug(("Updated login tokens: " +
login.getCredentials().getAllTokens()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static boolean compareCredentials(Credentials credentials1,
Credentials credentials2) {
+ // Get all the tokens from the first Credentials instance
+ Collection<Token<? extends TokenIdentifier>> tokens1 =
credentials1.getAllTokens();
+
+ // Get all the tokens from the second Credentials instance
+ Collection<Token<? extends TokenIdentifier>> tokens2 =
credentials2.getAllTokens();
+
+ // Check if the number of tokens is the same
+ if (tokens1.size() != tokens2.size()) {
+ return false;
+ }
+
+
+ // All tokens are the same
+ return tokens2.containsAll(tokens1);
+ }
}
{code}
> Record index lookup fails for long running Spark jobs on secured Yarn clusters
> ------------------------------------------------------------------------------
>
> Key: HUDI-7307
> URL: https://issues.apache.org/jira/browse/HUDI-7307
> Project: Apache Hudi
> Issue Type: Bug
> Components: metadata
> Affects Versions: 0.14.0, 0.14.1
> Environment: Java8, Spark 3.x, Hadoop 3.3.x
> Reporter: Zbigniew Baranowski
> Priority: Critical
> Labels: 0.14.0, 0.14.1_candidate
>
> *What is the problem?*
> Writing to Hudi tables with RECORD_INDEX enabled by a Spark job on Hadoop
> secured clusters fails once the job reaches 24h of lifetime (HDFS delegation
> token is renewed and the old one is expired) with an error:
> {code:java}
> 24/01/17 09:55:19 WARN TaskSetManager: Lost task 0.0 in stage 104.0 (TID 244)
> (host1.apache.org executor 1): org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieException: Error occurs when executing map
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
> at
> java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
> at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
> at
> org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
> at
> org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
> at
> org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> at org.apache.spark.scheduler.Task.run(Task.scala:139)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.hudi.exception.HoodieException: Error occurs when
> executing map
> at
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
> at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
> at java.util.stream.AbstractTask.compute(AbstractTask.java:327)
> at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.hudi.exception.HoodieIOException: Failed to scan
> metadata
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:165)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:155)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:175)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.getActiveTimeline(HoodieTableMetaClient.java:355)
> at
> org.apache.hudi.metadata.HoodieTableMetadataUtil.getValidInstantTimestamps(HoodieTableMetadataUtil.java:1264)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:473)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:429)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:414)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:291)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
> at
> org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
> ... 12 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Token for real user: , can't be found in cache
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1612)
> at org.apache.hadoop.ipc.Client.call(Client.java:1558)
> at org.apache.hadoop.ipc.Client.call(Client.java:1455)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
> at com.sun.proxy.$Proxy40.getListing(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:688)
> at sun.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at com.sun.proxy.$Proxy41.getListing(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1702)
> at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1686)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1100)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:147)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1175)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1172)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1182)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$listStatus$19(HoodieWrapperFileSystem.java:597)
> at
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
> at
> org.apache.hudi.common.fs.HoodieWrapperFileSystem.listStatus(HoodieWrapperFileSystem.java:596)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.scanFiles(HoodieTableMetaClient.java:552)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:645)
> at
> org.apache.hudi.common.table.HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(HoodieTableMetaClient.java:628)
> at
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.<init>(HoodieActiveTimeline.java:163)
> ... 22 more
> {code}
> *Affected version:*
> Hudi 0.14.0 and 0.14.1
> Spark 3.4.1 - probably all others too
> *Expected behavior:*
> Since the delegation token is internally renewed by spark, the job should not
> fail and continue to write Hudi table without an error, the same way as is
> done before a token expiration
> *Steps to reproduce:*
> On a Kerberos-secured Hadoop cluster run the following snippet:
> {code:java}
> # start spark-shell in secure mode with keytab and short
> spark.security.credentials.renewalRatio period so we don't have to wait hours
> for dt renewal.
> spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1 \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
> --conf
> 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
> \
> --conf
> 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
> --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
> --conf spark.security.credentials.renewalRatio=0.001 \
> --num-executors 1 \
> --executor-memory 2g \
> --keytab user.keytab \
> --principal [email protected]
> # Comments:
> # dt renewed every 86sec
> # keytab is needed to do dt renewal
> # run the snipped within spark-shell
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.DataFrame
> // keep the login token because we will cancel it later
> val token =
> org.apache.hadoop.security.UserGroupInformation.getLoginUser.getCredentials.getToken(new
>
> org.apache.hadoop.io.Text(org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration).getCanonicalServiceName))
> // helper function to create a dataframe
> def getDataFrame(data: Seq[Row]): DataFrame = {
> // Define the schema for the DataFrame
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = false),
> StructField("name", StringType, nullable = true),
> StructField("age", IntegerType, nullable = true)
> ))
> // Create a DataFrame from the schema and data
> val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
> schema)
> df
> }
> // helper function to write df to hudi
> def write(dataFrame: DataFrame): Unit = {
> dataFrame.
> write.
> format("org.apache.hudi").
> option("hoodie.table.name", "test").
> option("hoodie.datasource.write.precombine.field", "age").
> option("hoodie.datasource.write.recordkey.field", "id").
> option("hoodie.datasource.write.partitionpath.field", "").
> option("hoodie.datasource.hive_sync.support_timestamp", "true").
> option("hoodie.datasource.hive_sync.enable", "false").
> option("hoodie.datasource.write.reconcile.schema", "true").
> option("hoodie.write.markers.type", "DIRECT").
> option("hoodie.schema.on.read.enable","true").
> option("hoodie.datasource.write.reconcile.schema","true").
> option("hoodie.metadata.enable","true").
> option("hoodie.index.type","RECORD_INDEX").
> option("hoodie.metadata.record.index.enable","true").
> option("hoodie.datasource.write.operation", "UPSERT").
>
> option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>
> option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor").
> mode(org.apache.spark.sql.SaveMode.Append).
> save("/tmp/hudi_test")
> }
> val data1 = Seq(
> Row(1, "Alice", 25),
> Row(2, "Bob", 30),
> Row(3, "Charlie", 22)
> )
> val df1 = getDataFrame(data1);
> val data2 = Seq(
> Row(1, "Alice", 26),
> Row(2, "Bob", 30),
> Row(3, "Charlie", 23)
> )
> val df2 = getDataFrame(data2);
> // write for the first time this will be successfull because the token is
> still valid
> write(df1)
> // wait for renewal so we can 'safely' cancel the prevoius token (wait time =
> day ms * spark.security.credentials.renewalRatio )
> java.lang.Thread.sleep((86400000 *
> spark.conf.get("spark.security.credentials.renewalRatio").toFloat).toLong)
> // cancel the token
> token.cancel(spark.sparkContext.hadoopConfiguration)
> //this should fail some tasks on executors and the whole job with Token for
> real user: , can't be found in cache
> write(df1)
> // if not run the write again
> write(df2)
> // and again - not all task may fail as this depends which thread is used for
> handling meta access
> write(df1)
> {code}
> *Root cause analysis:*
> Record Index lookup is done via HoodieBackedTableMetadata.getRecordsByKeys
> which uses HoodieLocalEngineContext object on Spark executors.
> HoodieLocalEngineContext is leveraging java.util.stream for input data
> processing in a parallel fashion (by calling parallel() in the pipeline).
> Parallelism in java.util.stream is implemented using ForkJoinPool executor
> pool.
> This pool once initialized for the first time reuses the same threads for any
> future call.
> This is the key to understanding why even if HDFS delegation token is renewed
> by spark, some actions executed in HoodieLocalEngineContext are still using
> the old/first token that spark executor was started with.
> ForkJoinPool seems to be initialized at the beginning of the spark executor
> lifetime, outside of the Java Access Control Context privilege action (not
> within UserGroupInformation.doAs call). Therefore it only has login user
> credentials/tokens read from token files available to each YARN executor
> during startup - these are valid for the next 24h, so all will be fine for
> the next 24h.
> Normal Spark task thread pool is expected within Java Access Control Context
> privilege action(as a part of doAs):
> [https://github.com/apache/spark/blob/4a69ce68172dc4bae230e933f30081414ddb50cd/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L413C1-L485C4]).
> So in single-threaded execution of a spark task, the thread has the access
> to tokens available within Java Access Control Context (returned by
> UserGroupInformation.getCurrentUser()).
> Finally, the current state of the Spark art makes Spark renew the tokens only
> within Java Security Context:
> [https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L138],
> so those outside of the Java Access Control Context are unchanged - this
> applies to ForkJoinPool
> To conclude, because of the way how Spark renews the delegation token on
> executors, using parallel pipelines of java.util.stream in a code delegated
> to executors is +unsafe+ and will cause a failure once the current token has
> expired. So in general using of HoodieLocalEngineContext is not delegation
> token safe.
> Worth noting here that the delegation token business applies only to the
> YARN-secured cluster
> *Potential solutions/workarounds:*
> The following attempts have been tried and they are confirmed to be solving
> the problem:
> 1) Remove all calls to parallel() on java.util.stream in functions of
> HoodieLocalEngineContext. For index lookup fixing only removal in map
> function is needed
> 2) Populate new credentials/token to ForkJoinPool threads by updating login
> user credentials:
> {code:java}
> diff --git
> a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
>
> b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> index 5239490816d..ca78e6d7529 100644
> ---
> a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> +++
> b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
> @@ -18,6 +18,10 @@
>
> package org.apache.hudi.common.engine;
>
> +import org.apache.hadoop.security.Credentials;
> +import org.apache.hadoop.security.UserGroupInformation;
> +import org.apache.hadoop.security.token.Token;
> +import org.apache.hadoop.security.token.TokenIdentifier;
> import org.apache.hudi.common.config.SerializableConfiguration;
> import org.apache.hudi.common.data.HoodieAccumulator;
> import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
> @@ -34,7 +38,11 @@ import
> org.apache.hudi.common.util.collection.ImmutablePair;
> import org.apache.hudi.common.util.collection.Pair;
>
> import org.apache.hadoop.conf.Configuration;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> +import java.io.IOException;
> +import java.util.Collection;
> import java.util.Collections;
> import java.util.Iterator;
> import java.util.List;
> @@ -56,8 +64,17 @@ import static
> org.apache.hudi.common.function.FunctionWrapper.throwingReduceWrap
> */
> public final class HoodieLocalEngineContext extends HoodieEngineContext {
>
> + private static final Logger LOG =
> LoggerFactory.getLogger(HoodieLocalEngineContext.class);
> +
> public HoodieLocalEngineContext(Configuration conf) {
> this(conf, new LocalTaskContextSupplier());
> + try {
> + LOG.debug("Creating HoodieLocalEngineContext ugi tokens: "
> + +
> UserGroupInformation.getCurrentUser().getCredentials().getAllTokens() + "
> config="
> + + conf.toString());
> + } catch (IOException e) {
> + throw new RuntimeException(e);
> + }
> }
>
> public HoodieLocalEngineContext(Configuration conf, TaskContextSupplier
> taskContextSupplier) {
> @@ -81,12 +98,15 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
>
> @Override
> public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func,
> int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> data.stream().parallel().map(throwingMapWrapper(func)).collect(toList());
> + //return data.stream().map(throwingMapWrapper(func)).collect(toList());
> }
>
> @Override
> public <I, K, V> List<V> mapToPairAndReduceByKey(
> List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc,
> SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
> .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
> .map(list -> list.stream().map(e ->
> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).get())
> @@ -97,6 +117,7 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
> public <I, K, V> Stream<ImmutablePair<K, V>>
> mapPartitionsToPairAndReduceByKey(
> Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V>
> flatMapToPairFunc,
> SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
> .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
> .map(entry -> new ImmutablePair<>(entry.getKey(),
> entry.getValue().stream().map(
> @@ -107,6 +128,7 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
> @Override
> public <I, K, V> List<V> reduceByKey(
> List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
> parallelism) {
> + checkAndPropagateTokensRenewal();
> return data.stream().parallel()
> .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
> .map(list -> list.stream().map(e ->
> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
> @@ -116,6 +138,7 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
>
> @Override
> public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
> Stream<O>> func, int parallelism) {
> + checkAndPropagateTokensRenewal();
> return
> data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
> }
>
> @@ -170,4 +193,42 @@ public final class HoodieLocalEngineContext extends
> HoodieEngineContext {
> public void cancelAllJobs() {
> // no operation for now
> }
> +
> + private void checkAndPropagateTokensRenewal() {
> + if (!UserGroupInformation.isSecurityEnabled()) {
> + return;
> + }
> +
> + try {
> + UserGroupInformation login = UserGroupInformation.getLoginUser();
> + UserGroupInformation current = UserGroupInformation.getCurrentUser();
> + LOG.debug("Login tokens:" + login.getCredentials().getAllTokens());
> + LOG.debug("Current tokens:" + current.getCredentials().getAllTokens());
> +
> + if (!compareCredentials(current.getCredentials(),
> login.getCredentials())) {
> + LOG.debug("Login and current are different, updating the tokens");
> + login.addCredentials(current.getCredentials());
> + LOG.debug(("Updated login tokens: " +
> login.getCredentials().getAllTokens()));
> + }
> + } catch (IOException e) {
> + throw new RuntimeException(e);
> + }
> + }
> +
> + private static boolean compareCredentials(Credentials credentials1,
> Credentials credentials2) {
> + // Get all the tokens from the first Credentials instance
> + Collection<Token<? extends TokenIdentifier>> tokens1 =
> credentials1.getAllTokens();
> +
> + // Get all the tokens from the second Credentials instance
> + Collection<Token<? extends TokenIdentifier>> tokens2 =
> credentials2.getAllTokens();
> +
> + // Check if the number of tokens is the same
> + if (tokens1.size() != tokens2.size()) {
> + return false;
> + }
> +
> +
> + // All tokens are the same
> + return tokens2.containsAll(tokens1);
> + }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)