xicm opened a new issue, #10891:
URL: https://github.com/apache/hudi/issues/10891

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   spark structured streaming ingest data with hoodie.metadata.enable=true, the 
async compaction will write a DELTACOMMIT instant to MDT, as the compaction is 
async, the data writer will rollback the inflight delta commit in MDT, when the 
compaction finish, the compaction writer will find the inflight deltacommit 
does not exit, throw an exception.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.
   2.
   3.
   4.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :
   
   * Spark version :
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) :
   
   * Running on Docker? (yes/no) :
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```
   24/03/20 09:51:23 INFO BaseRollbackActionExecutor: Rolled back inflight 
instant 20240320095114761
   24/03/20 09:51:23 INFO BaseRollbackActionExecutor: Index rolled back for 
commits [==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
   24/03/20 09:51:23 INFO BaseRollbackActionExecutor: Deleting 
instant=[==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
   24/03/20 09:51:23 INFO HoodieActiveTimeline: Deleting instant 
[==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
   24/03/20 09:51:24 INFO HoodieActiveTimeline: Removed instant 
[==>20240320095114761__deltacommit__INFLIGHT__20240320095151179]
   24/03/20 09:51:24 INFO HoodieActiveTimeline: Deleting instant 
[==>20240320095114761__deltacommit__REQUESTED]
   24/03/20 09:51:24 INFO HoodieSparkSqlWriterInternal: 
Config.inlineCompactionEnabled ? false
   24/03/20 09:51:24 INFO HoodieSparkSqlWriterInternal: 
Config.asyncClusteringEnabled ? false
   24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 570 from persistence 
list
   24/03/20 09:51:24 INFO BlockManager: Removing RDD 570
   24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 558 from persistence 
list
   24/03/20 09:51:24 INFO BlockManager: Removing RDD 558
   24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 562 from persistence 
list
   24/03/20 09:51:24 INFO BlockManager: Removing RDD 562
   24/03/20 09:51:24 INFO MapPartitionsRDD: Removing RDD 604 from persistence 
list
   24/03/20 09:51:24 INFO BlockManager: Removing RDD 604
   24/03/20 09:51:24 INFO UnionRDD: Removing RDD 581 from persistence list
   24/03/20 09:51:24 INFO BlockManager: Removing RDD 581
   24/03/20 09:51:24 ERROR HoodieStreamingSink: Micro batch id=10 threw 
following exception:
   org.apache.hudi.exception.HoodieRollbackException: Failed to rollback 
/tmp/hoodie/hudi40/.hoodie/metadata commits 20240320095114761
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1065)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1012)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:940)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:922)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:917)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:941)
        at 
org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:222)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:940)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:925)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1092)
        at 
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:117)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:810)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:865)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.writeTableMetadata(BaseHoodieWriteClient.java:363)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:286)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
        at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1081)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:520)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
        at 
org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:138)
        at scala.util.Try$.apply(Try.scala:213)
        at 
org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$1(HoodieStreamingSink.scala:130)
        at 
org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:234)
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:129)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete 
instant [==>20240320095114761__deltacommit__REQUESTED] with path 
/tmp/hoodie/hudi40/.hoodie/metadata/.hoodie/20240320095114761.deltacommit.requested
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:301)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:243)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:294)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:259)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:117)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:138)
        at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.rollback(HoodieSparkMergeOnReadTable.java:218)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1048)
        ... 49 more
   24/03/20 09:51:24 INFO HoodieStreamingSink: Retrying the failed micro batch 
id=10 ...
   24/03/20 09:51:24 INFO BlockManagerInfo: Added rdd_604_0 in memory on 
host-10-19-29-165:40389 (size: 512.0 B, free: 365.9 MiB)
   24/03/20 09:51:24 INFO TaskSetManager: Finished task 0.0 in stage 225.0 (TID 
246) in 989 ms on host-10-19-29-165 (executor 2) (1/1)
   24/03/20 09:51:24 INFO YarnScheduler: Removed TaskSet 225.0, whose tasks 
have all completed, from pool
   24/03/20 09:51:24 INFO DAGScheduler: ResultStage 225 (start at <pastie>:64) 
finished in 1.218 s
   24/03/20 09:51:24 INFO DAGScheduler: Job 138 is finished. Cancelling 
potential speculative or zombie tasks for this job
   24/03/20 09:51:24 INFO YarnScheduler: Killing all running tasks in stage 
225: Stage finished
   24/03/20 09:51:24 INFO DAGScheduler: Job 138 finished: start at <pastie>:64, 
took 1.410154 s
   24/03/20 09:51:24 INFO CommitUtils: Creating  metadata for UPSERT_PREPPED 
numWriteStats:1 numReplaceFileIds:0
   24/03/20 09:51:24 INFO BaseSparkCommitActionExecutor: Committing 
20240320095114761, action Type deltacommit, operation Type UPSERT_PREPPED
   24/03/20 09:51:24 INFO SparkContext: Starting job: start at <pastie>:64
   24/03/20 09:51:24 INFO DAGScheduler: Got job 141 (start at <pastie>:64) with 
1 output partitions
   24/03/20 09:51:24 INFO DAGScheduler: Final stage: ResultStage 229 (start at 
<pastie>:64)
   24/03/20 09:51:24 INFO DAGScheduler: Parents of final stage: List()
   24/03/20 09:51:24 INFO DAGScheduler: Missing parents: List()
   24/03/20 09:51:24 INFO DAGScheduler: Submitting ResultStage 229 
(MapPartitionsRDD[616] at start at <pastie>:64), which has no missing parents
   24/03/20 09:51:24 INFO MemoryStore: Block broadcast_196 stored as values in 
memory (estimated size 154.3 KiB, free 364.3 MiB)
   24/03/20 09:51:24 INFO MemoryStore: Block broadcast_196_piece0 stored as 
bytes in memory (estimated size 53.9 KiB, free 364.2 MiB)
   24/03/20 09:51:24 INFO BlockManagerInfo: Added broadcast_196_piece0 in 
memory on host-10-19-29-153:57823 (size: 53.9 KiB, free: 365.8 MiB)
   24/03/20 09:51:24 INFO SparkContext: Created broadcast 196 from broadcast at 
DAGScheduler.scala:1513
   24/03/20 09:51:24 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 229 (MapPartitionsRDD[616] at start at <pastie>:64) (first 15 tasks 
are for partitions Vector(0))
   24/03/20 09:51:24 INFO YarnScheduler: Adding task set 229.0 with 1 tasks 
resource profile 0
   24/03/20 09:51:24 INFO TaskSetManager: Starting task 0.0 in stage 229.0 (TID 
250) (host-10-19-29-165, executor 2, partition 0, PROCESS_LOCAL, 4449 bytes) 
taskResourceAssignments Map()
   24/03/20 09:51:24 INFO BlockManagerInfo: Added broadcast_196_piece0 in 
memory on host-10-19-29-165:40389 (size: 53.9 KiB, free: 365.9 MiB)
   24/03/20 09:51:24 INFO TaskSetManager: Finished task 0.0 in stage 229.0 (TID 
250) in 46 ms on host-10-19-29-165 (executor 2) (1/1)
   24/03/20 09:51:24 INFO YarnScheduler: Removed TaskSet 229.0, whose tasks 
have all completed, from pool
   24/03/20 09:51:24 INFO DAGScheduler: ResultStage 229 (start at <pastie>:64) 
finished in 0.085 s
   24/03/20 09:51:24 INFO DAGScheduler: Job 141 is finished. Cancelling 
potential speculative or zombie tasks for this job
   24/03/20 09:51:24 INFO YarnScheduler: Killing all running tasks in stage 
229: Stage finished
   24/03/20 09:51:24 INFO DAGScheduler: Job 141 finished: start at <pastie>:64, 
took 0.088189 s
   24/03/20 09:51:24 INFO HoodieActiveTimeline: Marking instant complete 
[==>20240320095114761__deltacommit__INFLIGHT]
   24/03/20 09:51:24 INFO HoodieActiveTimeline: Checking for file exists 
?/tmp/hoodie/hudi40/.hoodie/metadata/.hoodie/20240320095114761.deltacommit.inflight
   24/03/20 09:51:24 ERROR AsyncCompactService: Compactor executor failed
   org.apache.hudi.exception.HoodieException: Failed to update metadata
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.writeTableMetadata(BaseHoodieTableServiceClient.java:706)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.completeCompaction(BaseHoodieTableServiceClient.java:330)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.commitCompaction(BaseHoodieTableServiceClient.java:315)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commitCompaction(BaseHoodieWriteClient.java:1077)
        at 
org.apache.hudi.client.HoodieSparkCompactor.compact(HoodieSparkCompactor.java:61)
        at 
org.apache.hudi.async.AsyncCompactService.lambda$null$0(AsyncCompactService.java:84)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:618)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:599)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:224)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.commit(BaseSparkCommitActionExecutor.java:311)
        at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:201)
        at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:183)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
        at 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
        at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
        at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:156)
        at 
org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:63)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1099)
        at 
org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:117)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:810)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.updateFromWriteStatuses(HoodieBackedTableMetadataWriter.java:865)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.writeTableMetadata(BaseHoodieTableServiceClient.java:701)
        ... 9 more
   24/03/20 09:51:24 INFO HoodieStreamingSink: Async Compactor shutdown. 
Errored ? true
   .```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to