yihua commented on code in PR #7517:
URL: https://github.com/apache/hudi/pull/7517#discussion_r1058599758
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -844,13 +860,56 @@ private boolean hasCommittedLogFiles(
if (reader.hasNext()) {
HoodieLogBlock block = reader.next();
final String instantTime =
block.getLogBlockHeader().get(INSTANT_TIME);
- if
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
- || inflightInstantsTimeline.containsInstant(instantTime)) {
+ if (completedInstantsTimeline.containsInstant(instantTime)) {
+ // The instant is completed, in active timeline
+ // Checking commit metadata only as log files can only be written
by COMMIT or DELTA_COMMIT
+ if (!committedFilesMap.containsKey(instantTime)) {
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+ completedInstantsTimeline.getInstantDetails(
+ completedInstantsTimeline.filter(i ->
i.getTimestamp().equals(instantTime))
+ .firstInstant().get()
+ ).get(),
+ HoodieCommitMetadata.class
+ );
+ committedFilesMap.put(
+ instantTime,
+ commitMetadata.getWriteStats().stream()
+
.map(HoodieWriteStat::getPath).collect(Collectors.toSet())
+ );
+ }
+
+ // Here we check if the commit metadata contains the log file.
+ // Note that, a log file may be written by a successful write
transaction
+ // leading to a delta commit, but such a log file can be
uncommitted and
+ // not be part of any snapshot, due to Spark task retries for
example.
+ // In such a case, the log file can stay in the file system, but
the metadata
+ // table does not contain the log file for file listing, which is
an expected
+ // behavior.
+ String relativeLogFilePathStr = getRelativePath(basePath,
logFilePathStr);
+ if
(committedFilesMap.get(instantTime).contains(relativeLogFilePathStr)) {
+ LOG.warn("Log file is committed in an instant in active
timeline: instantTime="
+ + instantTime + " " + logFilePathStr);
+ return true;
+ } else {
+ LOG.warn("Log file is uncommitted in a completed instant, likely
due to retry: "
+ + "instantTime=" + instantTime + " " + logFilePathStr);
+ }
+ } else if
(completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) {
+ // The instant is in archived timeline
+ LOG.warn("Log file is committed in an instant in archived
timeline: instantTime="
+ + instantTime + " " + logFilePathStr);
+ return true;
+ } else if (inflightInstantsTimeline.containsInstant(instantTime)) {
+ // The instant is inflight in active timeline
// hit an uncommitted block possibly from a failed write
- LOG.warn("Log file is uncommitted: " + logFilePathStr);
+ LOG.warn("Log file is uncommitted because of an inflight instant:
instantTime="
+ + instantTime + " " + logFilePathStr);
} else {
- LOG.warn("Log file is committed: " + logFilePathStr);
- return true;
+ // The instant is after the start of the active timeline,
+ // but it cannot be found in the active timeline
+ LOG.warn("Log file is uncommitted because the instant is after the
start of the "
Review Comment:
Yes, that's the logic here. It only emits a warning log saying the log file
is uncommited, and continues the processing, not returning `true` (returning
`true` means there are committed log files which causes mismatch).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]