[
https://issues.apache.org/jira/browse/HUDI-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Chen closed HUDI-4406.
----------------------------
Resolution: Fixed
Fixed via master branch: ed6b7f6aedc2cba0f753a4ee130cef860ecb0801
> Support compaction commit write error resolvement to avoid data loss
> --------------------------------------------------------------------
>
> Key: HUDI-4406
> URL: https://issues.apache.org/jira/browse/HUDI-4406
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Affects Versions: 0.12.0
> Reporter: Shizhi Chen
> Assignee: Shizhi Chen
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.13.1, 0.14.0
>
>
> Currently CompactionCommitSink commit or rollback logics doesn't take the
> writestatus error under consideration (only consider null writestatus), which
> actually will cause data loss when compacting the delta commit log files into
> the new versioned data files.
> eg. org.apache.hudi.io.HoodieMergeHandle#writeRecord will lead to data loss
> from log files due to Exceptions.
> {code:java}
> ```java
> protected boolean writeRecord(HoodieRecord<T> hoodieRecord,
> Option<IndexedRecord> indexedRecord, boolean isDelete) {
> Option recordMetadata = hoodieRecord.getData().getMetadata();
> if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
> HoodieUpsertException failureEx = new HoodieUpsertException("mismatched
> partition path, record partition: "
> + hoodieRecord.getPartitionPath() + " but trying to insert into
> partition: " + partitionPath);
> writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
> return false;
> }
> try {
> if (indexedRecord.isPresent() && !isDelete) {
> writeToFile(hoodieRecord.getKey(), (GenericRecord)
> indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
> recordsWritten++;
> } else {
> recordsDeleted++;
> }
> writeStatus.markSuccess(hoodieRecord, recordMetadata);
> // deflate record payload after recording success. This will help users
> access payload as a
> // part of marking
> // record successful.
> hoodieRecord.deflate();
> return true;
> } catch (Exception e) {
> LOG.error("Error writing record " + hoodieRecord, e);
> writeStatus.markFailure(hoodieRecord, e, recordMetadata);
> }
> return false;
> }{code}
> And it's known that StreamWriteOperatorCoordinator has related commit or
> rollback handle process.
> So this pr will:
> a) Also add writestatus error as rollback reason for CompactionCommitSink
> compaction rollback to avoid data loss
> b) Unify the handle procedure for write commit policy with its implementions,
> as described in org.apache.hudi.commit.policy.WriteCommitPolicy, which is
> consolidated with that of StreamWriteOperatorCoordinator.
> c) All control whether data quality or ingestion stability should be in high
> priority through FlinkOptions#IGNORE_FAILED.
> And, we suggest that FlinkOptions#IGNORE_FAILED be in true by default to
> avoid data loss.
> d) Optimize and fix some tiny bugs for log traces when commiting on error or
> rolling back.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)