chenshzh commented on PR #6121:
URL: https://github.com/apache/hudi/pull/6121#issuecomment-1207662818
> > @danny0405 would you pls help see this pr for avoiding data loss during
compaction due to some write errors such as non-thrown exceptions ?
>
> What kind of exceptions in your production caused the write handle to
encounter exceptions then ?
@danny0405 Just as described in the opening part,we have encountered data
loss during compaction. And after analyzing the code below and validating it
online, we found that some Exceptions are catched during
HoodieMergeHandle#writeRecord, then the writing handle process is interrupted,
and only marked as failures in WriteStatus, therefore causes data loss from
delta logs to data files. The exceptions might be like IOException from
HoodieFileWriter#writeToFile, HoodieUpsertException, and so on.
```java
protected boolean writeRecord(HoodieRecord<T> hoodieRecord,
Option<IndexedRecord> indexedRecord, boolean isDelete) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new
HoodieUpsertException("mismatched partition path, record partition: "
+ hoodieRecord.getPartitionPath() + " but trying to insert into
partition: " + partitionPath);
writeStatus.markFailure(hoodieRecord, failureEx, recordMetadata);
return false;
}
try {
if (indexedRecord.isPresent() && !isDelete) {
writeToFile(hoodieRecord.getKey(), (GenericRecord)
indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
recordsWritten++;
} else {
recordsDeleted++;
}
writeStatus.markSuccess(hoodieRecord, recordMetadata);
// deflate record payload after recording success. This will help
users access payload as a
// part of marking
// record successful.
hoodieRecord.deflate();
return true;
} catch (Exception e) {
LOG.error("Error writing record " + hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e, recordMetadata);
}
return false;
}
```
> And what would you want to do when you encounter that ?
What we want to do has been detailedly described in the opening part as a)
-> d).
Compaction completed commit will actually be regarded as the snapshot and
affect the latest file slices, so it's better to take the writestatus errors
under consideration when deciding to commit or rollback for compaction (Just as
the StreamWriteOperatorCoordinator done for deltacommit showed below).
In a word, compaction commit with errors should be rolled back, warned and
retried for next schedule if we think data quality is more important than job
stablility.
org.apache.hudi.sink.StreamWriteOperatorCoordinator#doCommit
```java
private void doCommit(String instant, List<WriteStatus> writeResults) {
// commit or rollback
long totalErrorRecords =
writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords =
writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
if (hasErrors) {
LOG.warn("Some records failed to merge but forcing commit since
commitOnErrors set to true. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
final Map<String, List<String>> partitionToReplacedFileIds =
tableState.isOverwrite
?
writeClient.getPartitionToReplacedFileIds(tableState.operationType,
writeResults)
: Collections.emptyMap();
boolean success = writeClient.commit(instant, writeResults,
Option.of(checkpointCommitMetadata),
tableState.commitAction, partitionToReplacedFileIds);
if (success) {
reset();
this.ckpMetadata.commitInstant(instant);
LOG.info("Commit instant [{}] success!", instant);
} else {
throw new HoodieException(String.format("Commit instant [%s]
failed!", instant));
}
} else {
LOG.error("Error when writing. Errors/Total=" + totalErrorRecords +
"/" + totalRecords);
LOG.error("The first 100 error messages");
writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
LOG.error("Global error for partition path {} and fileID {}: {}",
ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:"
+ key + " and value " + value));
}
});
// Rolls back instant
writeClient.rollback(instant);
throw new HoodieException(String.format("Commit instant [%s] failed
and rolled back !", instant));
}
}
```
So we suggest to use FlinkOptions#IGNORE_FAILED for both delta_commit's and
compaction commit's commitOrRollback policy. Otherwise, we will still
unexpectedly encounter data loss even though FlinkOptions#IGNORE_FAILED set
false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]