chenshzh commented on PR #6121:
URL: https://github.com/apache/hudi/pull/6121#issuecomment-1207662818

   > > @danny0405 would you pls help see this pr for avoiding data loss during 
compaction due to some write errors such as non-thrown exceptions ?
   > 
   > What kind of exceptions in your production caused the write handle to 
encounter exceptions then ? 
   
   @danny0405 Just as described in the opening part,we have encountered data 
loss during compaction. And after analyzing the code below and validating it 
online, we found that some Exceptions are catched during 
HoodieMergeHandle#writeRecord, then the writing handle process is interrupted, 
and only marked as failures in WriteStatus, therefore causes data loss from 
delta logs to data files. The exceptions might be like IOException from 
HoodieFileWriter#writeToFile, HoodieUpsertException, and so on.
   
   ```java
     protected boolean writeRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> indexedRecord, boolean isDelete) {
       Option recordMetadata = hoodieRecord.getData().getMetadata();
       if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
         HoodieUpsertException failureEx = new 
HoodieUpsertException("mismatched partition path, record partition: "
             + hoodieRecord.getPartitionPath() + " but trying to insert into 
partition: " + partitionPath);
         writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
         return false;
       }
       try {
         if (indexedRecord.isPresent() && !isDelete) {
           writeToFile(hoodieRecord.getKey(), (GenericRecord) 
indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
           recordsWritten++;
         } else {
           recordsDeleted++;
         }
         writeStatus.markSuccess(hoodieRecord, recordMetadata);
         // deflate record payload after recording success. This will help 
users access payload as a
         // part of marking
         // record successful.
         hoodieRecord.deflate();
         return true;
       } catch (Exception e) {
         LOG.error("Error writing record  " + hoodieRecord, e);
         writeStatus.markFailure(hoodieRecord, e, recordMetadata);
       }
       return false;
     }
   ```
   
   > And what would you want to do when you encounter that ?
   
   What we want to do has been detailedly described in the opening part as a) 
-> d). 
   
   Compaction completed commit will actually be regarded as the snapshot and 
affect the latest file slices, so it's better to take the writestatus errors 
under consideration when deciding to commit or rollback for compaction (Just as 
the StreamWriteOperatorCoordinator done for deltacommit showed below). 
   
   In a word, compaction commit with errors should be rolled back, warned and 
retried for next schedule if we think data quality is more important than job 
stablility.
   
   org.apache.hudi.sink.StreamWriteOperatorCoordinator#doCommit
   ```java
     private void doCommit(String instant, List<WriteStatus> writeResults) {
       // commit or rollback
       long totalErrorRecords = 
writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
       long totalRecords = 
writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
       boolean hasErrors = totalErrorRecords > 0;
   
       if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
         HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
         if (hasErrors) {
           LOG.warn("Some records failed to merge but forcing commit since 
commitOnErrors set to true. Errors/Total="
               + totalErrorRecords + "/" + totalRecords);
         }
   
         final Map<String, List<String>> partitionToReplacedFileIds = 
tableState.isOverwrite
             ? 
writeClient.getPartitionToReplacedFileIds(tableState.operationType, 
writeResults)
             : Collections.emptyMap();
         boolean success = writeClient.commit(instant, writeResults, 
Option.of(checkpointCommitMetadata),
             tableState.commitAction, partitionToReplacedFileIds);
         if (success) {
           reset();
           this.ckpMetadata.commitInstant(instant);
           LOG.info("Commit instant [{}] success!", instant);
         } else {
           throw new HoodieException(String.format("Commit instant [%s] 
failed!", instant));
         }
       } else {
         LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + 
"/" + totalRecords);
         LOG.error("The first 100 error messages");
         
writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
           LOG.error("Global error for partition path {} and fileID {}: {}",
               ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
           if (ws.getErrors().size() > 0) {
             ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" 
+ key + " and value " + value));
           }
         });
         // Rolls back instant
         writeClient.rollback(instant);
         throw new HoodieException(String.format("Commit instant [%s] failed 
and rolled back !", instant));
       }
     }
   ```
   
   So we suggest to use FlinkOptions#IGNORE_FAILED for both delta_commit's and 
compaction commit's  commitOrRollback policy. Otherwise, we will still 
unexpectedly encounter data loss even though FlinkOptions#IGNORE_FAILED set 
false.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to