KnightChess opened a new issue, #7057:
URL: https://github.com/apache/hudi/issues/7057
env:
hudi 0.11.0
spark 3.2.0
action:
spark sql insert overwrite
Suppose we have a timeline, and have multi writer job with occ
00:01 001.replacecommit.inflight
00:01 001.replacecommit.request
00:10 002.replacecommit.inflight
00:10 002.replacecommit.request
00:10 003.replacecommit.inflight
00:10 003.replacecommit.request
Task execution:
`001 instance` which is product by a history failed task, now, we have two
running job, which product `002` and `003 instance`
in the same time:
1.`002's job` has already finished `CleanerUtils.rollbackFailedWrites(xxx)`
in postCommit use autoCommitClean. And the default heartbeat expire time is
2min, the `001 instance` file will be delete in this job.
2.at the same time, `003's job` will execute
`BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded`, and it will throw
`HoodieException: Error getting all file groups in pending clustering`. Because
it will read instance detail from `001 instance` file in cache active timeline,
but it has be deleted by `002's job`.
something like we need reload active timeline when
`setPartitionToReplaceFileIds`, but it still can't resolve this question,
because can not guarantee it is the latest.
`updateIndexAndCommitIfNeeded` I think need to be moved to commit meta after
`beginTransaction`. I don’t know if it is appropriate to modify in this way,
and it may be stretch the holding time of holding locks.
```shell
org.apache.hudi.exception.HoodieException: Error getting all file groups in
pending clustering
at
org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(ClusteringUtils.java:135)
at
org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:113)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:108)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:102)
at
org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:93)
at
org.apache.hudi.metadata.HoodieMetadataFileSystemView.<init>(HoodieMetadataFileSystemView.java:44)
at
org.apache.hudi.common.table.view.FileSystemViewManager.createInMemoryFileSystemView(FileSystemViewManager.java:166)
at
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$createViewManager$5fcdabfe$1(FileSystemViewManager.java:259)
at
org.apache.hudi.common.table.view.FileSystemViewManager.lambda$getFileSystemView$1(FileSystemViewManager.java:111)
at
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at
org.apache.hudi.common.table.view.FileSystemViewManager.getFileSystemView(FileSystemViewManager.java:110)
at org.apache.hudi.table.HoodieTable.getHoodieView(HoodieTable.java:310)
at
org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:547)
at
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:246)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:653)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:316)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at
org.apache.spark.sql.hudi.catalog.HoodieV1WriteBuilder$$anon$1$$anon$2.insert(HoodieInternalV2Table.scala:116)
at
org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1(V1FallbackWriters.scala:79)
at
org.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1$(V1FallbackWriters.scala:78)
at
org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExecV1.writeWithV1(V1FallbackWriters.scala:51)
at
org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run(V1FallbackWriters.scala:66)
at
org.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run$(V1FallbackWriters.scala:65)
at
org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExecV1.run(V1FallbackWriters.scala:51)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:113)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:431)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:569)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:563)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:563)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
at
org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:490)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:229)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:748)
Caused by: org.apache.hudi.exception.HoodieIOException: Could not read
commit details from hdfs://xxx/.hoodie/20221024110608265.replacecommit.requested
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:761)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.getInstantDetails(HoodieActiveTimeline.java:266)
at
org.apache.hudi.common.util.ClusteringUtils.getRequestedReplaceMetadata(ClusteringUtils.java:90)
at
org.apache.hudi.common.util.ClusteringUtils.getClusteringPlan(ClusteringUtils.java:106)
at
org.apache.hudi.common.util.ClusteringUtils.lambda$getAllPendingClusteringPlans$0(ClusteringUtils.java:69)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(ClusteringUtils.java:129)
... 105 more
Caused by: java.io.FileNotFoundException: File does not exist:
/user/prod_dd_cdm/dd_cdm_dev/hive/dd_cdm_dev/dwd_trip_mkt_soul_carbon_coin_get_di/.hoodie/20221024110608265.replacecommit.requested
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at
org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:153)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2045)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:760)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:432)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:886)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:828)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1903)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2716)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1316)
at
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1301)
at
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1289)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1614)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:359)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:354)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:367)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:837)
at
org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:460)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.readDataFromPath(HoodieActiveTimeline.java:758)
... 117 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]