jshmchenxi opened a new issue #2753: URL: https://github.com/apache/iceberg/issues/2753
We are using a Spark Thrift Server ([Kyuubi](https://github.com/NetEase/kyuubi)) to provide adhoc query service for Iceberg. The global [WORKER_POOL](https://github.com/apache/iceberg/blob/apache-iceberg-0.11.1/core/src/main/java/org/apache/iceberg/util/ThreadPools.java#L41) is enabled by default. We found that after 3 days of running, some queries like `update test_table set data = 'abcd' where id = 1` start to throw such exception: ``` 21/06/02 17:17:28 ERROR v2.ReplaceDataExec: Data source write support IcebergBatchWrite(table=spark_catalog.test.test_table, format=PARQUET) aborted. 21/06/02 17:17:28 ERROR statement.ExecuteStatementInClientMode: Error executing query as venus, update test_table set data = 'abcd' where id = 1 Current operation state RUNNING, org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361) at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26) at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:765) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:765) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89) at org.apache.spark.sql.SparkSQLUtils$.toDataFrame(SparkSQLUtils.scala:39) at yaooqinn.kyuubi.operation.statement.ExecuteStatementInClientMode.execute(ExecuteStatementInClientMode.scala:181) at yaooqinn.kyuubi.operation.statement.ExecuteStatementOperation$$anon$1$$anon$2.run(ExecuteStatementOperation.scala:113) at yaooqinn.kyuubi.operation.statement.ExecuteStatementOperation$$anon$1$$anon$2.run(ExecuteStatementOperation.scala:109) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at yaooqinn.kyuubi.operation.statement.ExecuteStatementOperation$$anon$1.run(ExecuteStatementOperation.scala:109) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file: hdfs://.../test.db/test_table/metadata/f774a200-1cd9-468d-9096-44af66538af6-m0.avro at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:179) at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66) at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:103) at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:87) at org.apache.iceberg.SnapshotProducer.newManifestReader(SnapshotProducer.java:378) at org.apache.iceberg.MergingSnapshotProducer$DataFileFilterManager.newManifestReader(MergingSnapshotProducer.java:530) at org.apache.iceberg.ManifestFilterManager.filterManifest(ManifestFilterManager.java:290) at org.apache.iceberg.ManifestFilterManager.lambda$filterManifests$0(ManifestFilterManager.java:182) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:70) at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:310) ... 5 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for abc: HDFS_DELEGATION_TOKEN owner=abc, renewer=yarn, realUser=spark/[email protected], issueDate=1622108637132, maxDate=1622713437132, sequenceNumber=105708024, masterKeyId=708) can't be found in cache at org.apache.hadoop.ipc.Client.call(Client.java:1472) at org.apache.hadoop.ipc.Client.call(Client.java:1409) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) at com.sun.proxy.$Proxy20.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:256) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) at com.sun.proxy.$Proxy21.getBlockLocations(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1279) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1266) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1254) at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:305) at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:271) at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:263) at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1585) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:783) at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:175) ... 16 more ``` The reason is that WORKER_POOL is a singleton thread pool, initialized when service start. The first time a thread in WORKER_POOL is accessed, the ugi or credentials (which I'm not very clear about) of the context to the first query are left in the thread. The second time another query is issued and access the same thread, the credentials of that thread are still from the first query, causing this problem. We should use ugi.doAs() when using WORKER_POOL if tasks include accessing the cluster(eg. HDFS) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
