[ 
https://issues.apache.org/jira/browse/HUDI-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen closed HUDI-4406.
----------------------------
    Resolution: Fixed

Fixed via master branch: ed6b7f6aedc2cba0f753a4ee130cef860ecb0801

> Support compaction commit write error resolvement to avoid data loss
> --------------------------------------------------------------------
>
>                 Key: HUDI-4406
>                 URL: https://issues.apache.org/jira/browse/HUDI-4406
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>    Affects Versions: 0.12.0
>            Reporter: Shizhi Chen
>            Assignee: Shizhi Chen
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.13.1, 0.14.0
>
>
> Currently CompactionCommitSink commit or rollback logics doesn't take the 
> writestatus error under consideration (only consider null writestatus), which 
> actually will cause data loss when compacting the delta commit log files into 
> the new versioned data files.
> eg. org.apache.hudi.io.HoodieMergeHandle#writeRecord will lead to data loss 
> from log files due to Exceptions.
> {code:java}
> ```java
>   protected boolean writeRecord(HoodieRecord<T> hoodieRecord, 
> Option<IndexedRecord> indexedRecord, boolean isDelete) {
>     Option recordMetadata = hoodieRecord.getData().getMetadata();
>     if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
>       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
> partition path, record partition: "
>           + hoodieRecord.getPartitionPath() + " but trying to insert into 
> partition: " + partitionPath);
>       writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
>       return false;
>     }
>     try {
>       if (indexedRecord.isPresent() && !isDelete) {
>         writeToFile(hoodieRecord.getKey(), (GenericRecord) 
> indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
>         recordsWritten++;
>       } else {
>         recordsDeleted++;
>       }
>       writeStatus.markSuccess(hoodieRecord, recordMetadata);
>       // deflate record payload after recording success. This will help users 
> access payload as a
>       // part of marking
>       // record successful.
>       hoodieRecord.deflate();
>       return true;
>     } catch (Exception e) {
>       LOG.error("Error writing record  " + hoodieRecord, e);
>       writeStatus.markFailure(hoodieRecord, e, recordMetadata);
>     }
>     return false;
>   }{code}
> And it's known that StreamWriteOperatorCoordinator has related commit or 
> rollback handle process. 
> So this pr will:
> a)  Also add writestatus error as rollback reason for CompactionCommitSink 
> compaction rollback to avoid data loss
> b) Unify the handle procedure for write commit policy with its implementions, 
> as described in org.apache.hudi.commit.policy.WriteCommitPolicy, which is 
> consolidated with that of StreamWriteOperatorCoordinator.
> c) All control whether data quality or ingestion stability should be in high 
> priority through FlinkOptions#IGNORE_FAILED.
> And, we suggest that FlinkOptions#IGNORE_FAILED be in true by default to 
> avoid data loss.
> d) Optimize and fix some tiny bugs for log traces when commiting on error or 
> rolling back.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to