jshmchenxi opened a new issue #2753:
URL: https://github.com/apache/iceberg/issues/2753


   We are using a Spark Thrift Server 
([Kyuubi](https://github.com/NetEase/kyuubi)) to provide adhoc query service 
for Iceberg.
   The global 
[WORKER_POOL](https://github.com/apache/iceberg/blob/apache-iceberg-0.11.1/core/src/main/java/org/apache/iceberg/util/ThreadPools.java#L41)
 is enabled by default. 
   We found that after 3 days of running,  some queries like `update test_table 
set data = 'abcd' where id = 1` start to throw such exception:
   
   ```
   21/06/02 17:17:28 ERROR v2.ReplaceDataExec: Data source write support 
IcebergBatchWrite(table=spark_catalog.test.test_table, format=PARQUET) aborted.
   21/06/02 17:17:28 ERROR statement.ExecuteStatementInClientMode:
   Error executing query as venus,
   update test_table set data = 'abcd' where id = 1
   Current operation state RUNNING,
   org.apache.spark.SparkException: Writing job aborted.
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
           at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26)
           at 
org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
           at 
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
           at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:765)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
           at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
           at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
           at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:765)
           at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
           at 
org.apache.spark.sql.SparkSQLUtils$.toDataFrame(SparkSQLUtils.scala:39)
           at 
yaooqinn.kyuubi.operation.statement.ExecuteStatementInClientMode.execute(ExecuteStatementInClientMode.scala:181)
           at 
yaooqinn.kyuubi.operation.statement.ExecuteStatementOperation$$anon$1$$anon$2.run(ExecuteStatementOperation.scala:113)
           at 
yaooqinn.kyuubi.operation.statement.ExecuteStatementOperation$$anon$1$$anon$2.run(ExecuteStatementOperation.scala:109)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
           at 
yaooqinn.kyuubi.operation.statement.ExecuteStatementOperation$$anon$1.run(ExecuteStatementOperation.scala:109)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
           at java.lang.Thread.run(Thread.java:745)
   Caused by: org.apache.iceberg.exceptions.RuntimeIOException: Failed to open 
input stream for file: 
hdfs://.../test.db/test_table/metadata/f774a200-1cd9-468d-9096-44af66538af6-m0.avro
           at 
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:179)
           at 
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
           at 
org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
           at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:103)
           at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:87)
           at 
org.apache.iceberg.SnapshotProducer.newManifestReader(SnapshotProducer.java:378)
           at 
org.apache.iceberg.MergingSnapshotProducer$DataFileFilterManager.newManifestReader(MergingSnapshotProducer.java:530)
           at 
org.apache.iceberg.ManifestFilterManager.filterManifest(ManifestFilterManager.java:290)
           at 
org.apache.iceberg.ManifestFilterManager.lambda$filterManifests$0(ManifestFilterManager.java:182)
           at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
           at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:70)
           at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:310)
           ... 5 more
    
    
   Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (token for abc: HDFS_DELEGATION_TOKEN owner=abc, renewer=yarn, 
realUser=spark/[email protected], issueDate=1622108637132, 
maxDate=1622713437132, sequenceNumber=105708024, masterKeyId=708) can't be 
found in cache
           at org.apache.hadoop.ipc.Client.call(Client.java:1472)
           at org.apache.hadoop.ipc.Client.call(Client.java:1409)
           at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
           at com.sun.proxy.$Proxy20.getBlockLocations(Unknown Source)
           at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:256)
           at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
           at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
           at com.sun.proxy.$Proxy21.getBlockLocations(Unknown Source)
           at 
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1279)
           at 
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1266)
           at 
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1254)
           at 
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:305)
           at 
org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:271)
           at 
org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:263)
           at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1585)
           at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
           at 
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
           at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
           at 
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
           at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:783)
           at 
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:175)
           ... 16 more
   ```
   
   The reason is that WORKER_POOL is a singleton thread pool, initialized when 
service start. The first time a thread in WORKER_POOL is accessed, the ugi or 
credentials (which I'm not very clear about) of the context to the first query 
are left in the thread. The second time another query is issued and access the 
same thread, the credentials of that thread are still from the first query, 
causing this problem.
   
   We should use ugi.doAs() when using WORKER_POOL if tasks include accessing 
the cluster(eg. HDFS)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to