KnightChess commented on issue #6679:
URL: https://github.com/apache/hudi/issues/6679#issuecomment-1288632725
@nsivabalan thanks reply, I use 0.11.0 version. But we are batch job, not
streaming job. Follow the config which you advice in code, I found the
execption processing logic in streaming model, I will try to refer it to
implement my logic in batch job. thanks
```scala
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, updatedOptions, data, hoodieTableConfig,
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
) match {
case Success((true, commitOps, compactionInstantOps,
clusteringInstant, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (compactionInstantOps.isPresent) {
asyncCompactorService.enqueuePendingAsyncServiceInstant(
new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
}
if (clusteringInstant.isPresent) {
asyncClusteringService.enqueuePendingAsyncServiceInstant(new
HoodieInstant(
State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION,
clusteringInstant.get()
))
}
Success((true, commitOps, compactionInstantOps))
case Failure(e) =>
// clean up persist rdds in the write process
data.sparkSession.sparkContext.getPersistentRDDs
.foreach {
case (id, rdd) =>
try {
rdd.unpersist()
} catch {
case t: Exception => log.warn("Got excepting trying to
unpersist rdd", t)
}
}
log.error(s"Micro batch id=$batchId threw following exception: ",
e)
if (ignoreFailedBatch) {
log.info(s"Ignore the exception and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key}
configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch
id=$batchId ...")
Failure(e)
}
case Success((false, commitOps, compactionInstantOps,
clusteringInstant, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s""
}))
if (ignoreFailedBatch) {
log.info(s"Ignore the errors and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key}
configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch
id=$batchId ...")
Failure(new HoodieCorruptedDataException(s"Micro batch
id=$batchId ended up with errors"))
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]