KnightChess commented on issue #6679:
URL: https://github.com/apache/hudi/issues/6679#issuecomment-1288632725

   @nsivabalan thanks reply, I use 0.11.0 version. But we are batch job, not 
streaming job. Follow the config which you advice in code, I found the 
execption processing logic in streaming model, I will try to refer it to 
implement my logic in batch job. thanks
   ```scala
         Try(
           HoodieSparkSqlWriter.write(
             sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
         ) match {
           case Success((true, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
             log.info(s"Micro batch id=$batchId succeeded"
               + (commitOps.isPresent match {
               case true => s" for commit=${commitOps.get()}"
               case _ => s" with no new commits"
             }))
             writeClient = Some(client)
             hoodieTableConfig = Some(tableConfig)
             if (compactionInstantOps.isPresent) {
               asyncCompactorService.enqueuePendingAsyncServiceInstant(
                 new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
             }
             if (clusteringInstant.isPresent) {
               asyncClusteringService.enqueuePendingAsyncServiceInstant(new 
HoodieInstant(
                 State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, 
clusteringInstant.get()
               ))
             }
             Success((true, commitOps, compactionInstantOps))
           case Failure(e) =>
             // clean up persist rdds in the write process
             data.sparkSession.sparkContext.getPersistentRDDs
               .foreach {
                 case (id, rdd) =>
                   try {
                     rdd.unpersist()
                   } catch {
                     case t: Exception => log.warn("Got excepting trying to 
unpersist rdd", t)
                   }
               }
             log.error(s"Micro batch id=$batchId threw following exception: ", 
e)
             if (ignoreFailedBatch) {
               log.info(s"Ignore the exception and move on streaming as per " +
                 s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} 
configuration")
               Success((true, None, None))
             } else {
               if (retryCnt > 1) log.info(s"Retrying the failed micro batch 
id=$batchId ...")
               Failure(e)
             }
           case Success((false, commitOps, compactionInstantOps, 
clusteringInstant, client, tableConfig)) =>
             log.error(s"Micro batch id=$batchId ended up with errors"
               + (commitOps.isPresent match {
                 case true =>  s" for commit=${commitOps.get()}"
                 case _ => s""
               }))
             if (ignoreFailedBatch) {
               log.info(s"Ignore the errors and move on streaming as per " +
                 s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} 
configuration")
               Success((true, None, None))
             } else {
               if (retryCnt > 1) log.info(s"Retrying the failed micro batch 
id=$batchId ...")
               Failure(new HoodieCorruptedDataException(s"Micro batch 
id=$batchId ended up with errors"))
             }
         }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to