Armelabdelkbir opened a new issue, #9213:
URL: https://github.com/apache/hudi/issues/9213

   
   **Describe the problem you faced**
   
   Hello, i'm using hudi in production as part of my  cdc pipeline ( debezium, 
kafka, spark), i have multiple jobs but some jobs failed with errors failled to 
rollback ...
   
   i don't have multiple writers at a time, just one spark structured streaming 
job by stream (database), i had this errors in past, i clean everything and i 
relaunch, but is not production approach for my largest database
   Error message :
   
   **To Reproduce**
   relaunch my production spark job
   
   
   **Expected behavior**
   
   Rollback the last delta_commit
   
   **Environment Description**
   
   * Hudi version : 0.11.0
   
   * Spark version : 3.4.1
   
   * Hive version : 1.2.1000
   
   * Hadoop version : 2.7.3
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Hoodie timeline**
   ```
   -rw-------   3 prd_foundry hadoop          0 2023-07-17 01:17 
/prd/***/.hoodie/20230716231754202.rollback.inflight
   -rw-------   3 prd_foundry hadoop       1230 2023-07-17 01:17 
/prd/***/.hoodie/20230716231754202.rollback.requested
   -rw-------   3 prd_foundry hadoop          0 2023-07-17 01:12 
/prd/***/.hoodie/20230716231210452.deltacommit.requested
   drwx------   - prd_foundry hadoop          0 2023-07-17 01:08 
/prd/***/.hoodie/.temp
   -rw-------   3 prd_foundry hadoop      81342 2023-07-17 01:08 
/prd/***/.hoodie/20230716224726466.deltacommit
   -rw-------   3 prd_foundry hadoop      47568 2023-07-17 01:07 
/prd/***/.hoodie/20230716224726466.deltacommit.inflight
   -rw-------   3 prd_foundry hadoop          0 2023-07-17 00:47 
/prd/***/.hoodie/20230716224726466.deltacommit.requested
   -rw-------   3 prd_foundry hadoop       1413 2023-07-17 00:47 
/prd/***/.hoodie/20230716224727964.rollback
   -rw-------   3 prd_foundry hadoop          0 2023-07-17 00:47 
/prd/***/.hoodie/20230716224727964.rollback.inflight
   -rw-------   3 prd_foundry hadoop       1230 2023-07-17 00:47 
/prd/***/.hoodie/20230716224727964.rollback.requested
   -rw-------   3 prd_foundry hadoop      51010 2023-07-17 00:30 
/prd/***/.hoodie/20230717002819413.commit
   -rw-------   3 prd_foundry hadoop      11133 2023-07-17 00:29 
/prd/***/.hoodie/20230717002839007.clean
   -rw-------   3 prd_foundry hadoop       9703 2023-07-17 00:29 
/prd/***/.hoodie/20230717002839007.clean.inflight
   -rw-------   3 prd_foundry hadoop       9703 2023-07-17 00:29 
/prd/***/.hoodie/20230717002839007.clean.requested
   -rw-------   3 prd_foundry hadoop          0 2023-07-17 00:28 
/prd/***/.hoodie/20230717002819413.compaction.inflight
   -rw-------   3 prd_foundry hadoop      12258 2023-07-17 00:28 
/prd/***/.hoodie/20230717002819413.compaction.requested
   drwx------   - prd_foundry hadoop          0 2023-07-17 00:28 
/prd/***/.hoodie/.aux
   -rw-------   3 prd_foundry hadoop      36386 2023-07-17 00:27 
/prd/***/.hoodie/20230716235643860.deltacommit
   -rw-------   3 prd_foundry hadoop      20594 2023-07-17 00:25 
/prd/***/.hoodie/20230716235643860.deltacommit.inflight
   ```
   *** Hudi conf ***
   ```
    "hoodie.metadata.enable" -> "false"
   hudi {
     options{
       upsert_parallelisme_value = "1500"
       insert_parallelisme_value = "1500"
       bulk_insert_parallelisme_value = "1500"
       bulk_insert_sort_mode = "NONE"
       parquet_small_file_limit = "104857600"
       streaming_retry_count = "3"
       streaming_retry_interval_ms ="2000"
       parquet_max_file_size = "134217728"
       parquet_block_size = "134217728"
       parquet_page_size = "1048576"
       index_type = "SIMPLE"
       simple.index_use_caching = "true"
       simple.index_input_storage_level = "MEMORY_AND_DISK_SER"
     }
     compaction{
       inline_compact = "true"
       inline_compact_num_delta_commits = "10"
       cleaner_commits_retained = "10"
       cleaner_policy = "KEEP_LATEST_COMMITS"
       cleaner_fileversions_retained = "3"
       async_clean = "true"
     }
         DataSourceWriteOptions.TABLE_TYPE.key -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
         DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts_ms",
         DataSourceWriteOptions.PARTITIONPATH_FIELD.key-> 
config.hudiConf.partition_fields,
         DataSourceWriteOptions.RECORDKEY_FIELD.key -> table.pks.mkString(","),
         DataSourceWriteOptions.OPERATION.key -> 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
         ```
   
   **Stacktrace**
   
   ```
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:356)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
   Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to 
rollback /prd/***/ commits 20230716231210452
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:783)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1193)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1176)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1164)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:964)
        at 
org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:151)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:963)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:956)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:303)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
        at 
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
        at com.ovh.ingest.Ingest$.test$1(Ingest.scala:117)
        at com.ovh.ingest.Ingest$.$anonfun$new$8(Ingest.scala:98)
        at com.ovh.ingest.Ingest$.$anonfun$new$8$adapted(Ingest.scala:98)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
        ... 1 more
   Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits 
after time :20230716231210452, please rollback greater commits first
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:181)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:220)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:118)
        at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:145)
        at 
org.apache.hudi.table.HoodieSparkMergeOnReadTable.rollback(HoodieSparkMergeOnReadTable.java:170)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:766)
        ... 58 more
        ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to