[
https://issues.apache.org/jira/browse/HUDI-5310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Drew Foulks updated HUDI-5310:
------------------------------
Reporter: Hussein Awala (was: Hussein Awala)
> Corrupted timeline after 2 concurrent INSERT_OVERWRITE_TABLE operations
> -----------------------------------------------------------------------
>
> Key: HUDI-5310
> URL: https://issues.apache.org/jira/browse/HUDI-5310
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Hussein Awala
> Priority: Major
>
> We have non partitioned table which we overwrite weekly using a spark job
> created from Airflow. After a task restart in Airflow, we had two concurrent
> jobs which insert the same data, and they have been finished successfully,
> but all the next jobs failed with the following exception:
> {code:java}
> 2022-11-30 14:01:29,717 INFO yarn.ApplicationMaster: Final app status:
> FAILED, exitCode: 15, (reason: User class threw exception:
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit
> time 20221130140117316
> at
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
> at
> org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor.execute(SparkInsertOverwriteCommitActionExecutor.java:63)
> at
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwriteTable(HoodieSparkCopyOnWriteTable.java:164)
> at
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insertOverwriteTable(HoodieSparkCopyOnWriteTable.java:97)
> at
> org.apache.hudi.client.SparkRDDWriteClient.insertOverwriteTable(SparkRDDWriteClient.java:226)
> at
> org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:210)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:144)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
> at
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
> at
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> at
> fr.leboncoin.data.lakehouse.datasource.HudiDatasource.operation(HudiDatasource.java:224)
> at
> fr.leboncoin.data.lakehouse.datasource.HudiDatasource.insertOverwriteTable(HudiDatasource.java:262)
> at
> fr.leboncoin.data.lakehouse.run.PrivacyClosedStores.run(PrivacyClosedStores.java:105)
> at
> fr.leboncoin.data.lakehouse.run.PrivacyClosedStores.main(PrivacyClosedStores.java:121)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
> Caused by: java.lang.IllegalStateException: Duplicate key
> [20221120004252778__replacecommit__COMPLETED]
> at
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
> at java.util.HashMap.merge(HashMap.java:1255)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.resetFileGroupsReplaced(AbstractTableFileSystemView.java:242)
> at
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.init(AbstractTableFileSystemView.java:108)
> at
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.init(HoodieTableFileSystemView.java:108)
> at
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:102)
> at
> org.apache.hudi.common.table.view.HoodieTableFileSystemView.<init>(HoodieTableFileSystemView.java:93)
> at
> org.apache.hudi.table.HoodieTable.getFileSystemView(HoodieTable.java:290)
> at
> org.apache.hudi.table.action.commit.UpsertPartitioner.getPartitionPathToPendingClusteringFileGroupsId(UpsertPartitioner.java:136)
> at
> org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:169)
> at
> org.apache.hudi.table.action.commit.UpsertPartitioner.<init>(UpsertPartitioner.java:95)
> at
> org.apache.hudi.table.action.commit.SparkInsertOverwritePartitioner.<init>(SparkInsertOverwritePartitioner.java:41)
> at
> org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor.getPartitioner(SparkInsertOverwriteCommitActionExecutor.java:70)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:161)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:85)
> at
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
> ... 45 more
> ) {code}
> After the debugging, we found a bug in [this
> method|https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java#L218]
> which found the same {{partitionToReplaceFileIds}} in the two commits, or in
> the OCC which dosen't work with insert overwrite table operation.
> Timeline:
> {code:java}
> 20221120004340987.replacecommit -> 2
> 20221120004340987.replacecommit.inflight -> 2
> 20221120004252778.replacecommit -> 1
> 20221120004340987.replacecommit.requested -> 2
> 20221120004252778.replacecommit.inflight -> 1
> 20221120004252778.replacecommit.requested -> 1 {code}
> Commit bodies:
> {code:json}
> 20221120004252778.replacecommit:
> {
> "partitionToWriteStats" : {
> "" : [ {
> "fileId" : "1721d4dc-5806-4137-8bff-5af556339959-0",
> "path" :
> "1721d4dc-5806-4137-8bff-5af556339959-0_0-6-211_20221120004252778.parquet",
> "prevCommit" : "null",
> "numWrites" : 122984,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 122984,
> "totalWriteBytes" : 4139555,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4139555,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "fc9ca09d-e440-40a2-ac08-c7013bcf6d25-0",
> "path" :
> "fc9ca09d-e440-40a2-ac08-c7013bcf6d25-0_1-6-212_20221120004252778.parquet",
> "prevCommit" : "null",
> "numWrites" : 122488,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 122488,
> "totalWriteBytes" : 4126038,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4126038,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "e295911b-1100-4056-bf2a-7fc8fa778a9f-0",
> "path" :
> "e295911b-1100-4056-bf2a-7fc8fa778a9f-0_2-6-213_20221120004252778.parquet",
> "prevCommit" : "null",
> "numWrites" : 123101,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 123101,
> "totalWriteBytes" : 4147197,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4147197,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "8d2e26c1-8c92-49b7-9ba6-365b676d4e1a-0",
> "path" :
> "8d2e26c1-8c92-49b7-9ba6-365b676d4e1a-0_3-6-214_20221120004252778.parquet",
> "prevCommit" : "null",
> "numWrites" : 123123,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 123123,
> "totalWriteBytes" : 4153959,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4153959,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "59ec3de4-73de-4025-83e0-f39eee9a1fc6-0",
> "path" :
> "59ec3de4-73de-4025-83e0-f39eee9a1fc6-0_4-6-215_20221120004252778.parquet",
> "prevCommit" : "null",
> "numWrites" : 115347,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 115347,
> "totalWriteBytes" : 3946550,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 3946550,
> "minEventTime" : null,
> "maxEventTime" : null
> } ]
> },
> "compacted" : false,
> "extraMetadata" : {
> "schema" : "{...},
> "operationType" : "INSERT_OVERWRITE_TABLE",
> "partitionToReplaceFileIds" : {
> "" : [ "93081ea3-6e42-4a9f-b34c-f643098e5194-0",
> "440b39d3-6d3c-4f59-b753-fc81d56c5a1f-0",
> "3a309f89-40ba-4e12-a389-a26677f35fd2-0",
> "32f2b395-a9f5-450f-8086-45996a439678-0",
> "25dc73d9-3e05-40ac-b45a-d7b87c05a678-0" ]
> }
> } {code}
> {code:json}
> 20221120004340987.replacecommit:
> {
> "partitionToWriteStats" : {
> "" : [ {
> "fileId" : "a479538d-4423-4594-97ec-28f21ffa2030-0",
> "path" :
> "a479538d-4423-4594-97ec-28f21ffa2030-0_0-6-211_20221120004340987.parquet",
> "prevCommit" : "null",
> "numWrites" : 122984,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 122984,
> "totalWriteBytes" : 4139440,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4139440,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "58f19fb8-9f05-4160-8f80-36123937081d-0",
> "path" :
> "58f19fb8-9f05-4160-8f80-36123937081d-0_1-6-212_20221120004340987.parquet",
> "prevCommit" : "null",
> "numWrites" : 122488,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 122488,
> "totalWriteBytes" : 4141374,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4141374,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "44a0539f-4e97-44e1-b1f1-c0c809fe4c64-0",
> "path" :
> "44a0539f-4e97-44e1-b1f1-c0c809fe4c64-0_2-6-213_20221120004340987.parquet",
> "prevCommit" : "null",
> "numWrites" : 123101,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 123101,
> "totalWriteBytes" : 4149562,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4149562,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "eb81d6d3-898e-41a3-8c93-33cc1b3bd4d9-0",
> "path" :
> "eb81d6d3-898e-41a3-8c93-33cc1b3bd4d9-0_3-6-214_20221120004340987.parquet",
> "prevCommit" : "null",
> "numWrites" : 123123,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 123123,
> "totalWriteBytes" : 4152918,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 4152918,
> "minEventTime" : null,
> "maxEventTime" : null
> }, {
> "fileId" : "8ad225d8-286c-42ae-b9bd-359e39f57e3d-0",
> "path" :
> "8ad225d8-286c-42ae-b9bd-359e39f57e3d-0_4-6-215_20221120004340987.parquet",
> "prevCommit" : "null",
> "numWrites" : 115347,
> "numDeletes" : 0,
> "numUpdateWrites" : 0,
> "numInserts" : 115347,
> "totalWriteBytes" : 3940020,
> "totalWriteErrors" : 0,
> "tempPath" : null,
> "partitionPath" : "",
> "totalLogRecords" : 0,
> "totalLogFilesCompacted" : 0,
> "totalLogSizeCompacted" : 0,
> "totalUpdatedRecordsCompacted" : 0,
> "totalLogBlocks" : 0,
> "totalCorruptLogBlock" : 0,
> "totalRollbackBlocks" : 0,
> "fileSizeInBytes" : 3940020,
> "minEventTime" : null,
> "maxEventTime" : null
> } ]
> },
> "compacted" : false,
> "extraMetadata" : {
> "schema" : "{...},
> "operationType" : "INSERT_OVERWRITE_TABLE",
> "partitionToReplaceFileIds" : {
> "" : [ "93081ea3-6e42-4a9f-b34c-f643098e5194-0",
> "440b39d3-6d3c-4f59-b753-fc81d56c5a1f-0",
> "3a309f89-40ba-4e12-a389-a26677f35fd2-0",
> "32f2b395-a9f5-450f-8086-45996a439678-0",
> "25dc73d9-3e05-40ac-b45a-d7b87c05a678-0" ]
> }
> }{code}
> After manually deleting the file {{{}20221120004252778.replacecommit{}}}, the
> problem was solved and the next job passed successfully.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)