yihua commented on code in PR #5052:
URL: https://github.com/apache/hudi/pull/5052#discussion_r973541985
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -232,16 +232,21 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
&&
!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed,
stop processing further
+ LOG.info("hit a block with instant time greater than should be
processed, stop processing further. logfile: + " + logFile
+ + " , blockType: " + logBlock.getBlockType() + " , instantTime:
" + instantTime + " , latestInstantTime : " + latestInstantTime);
break;
}
if (logBlock.getBlockType() != CORRUPT_BLOCK &&
logBlock.getBlockType() != COMMAND_BLOCK) {
- if
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
- || inflightInstantsTimeline.containsInstant(instantTime)) {
+ if (!checkIfValidCommit(instantTime) ||
inflightInstantsTimeline.containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to
the next one and skip processing this one
+ LOG.info("hit an uncommitted block possibly from a failed write,
move to the next one and skip processing this one. logfile: + " + logFile
+ + " , blockType: " + logBlock.getBlockType() + " ,
instantTime: " + instantTime + " , latestInstantTime : " + latestInstantTime);
continue;
}
if (instantRange.isPresent() &&
!instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
+ LOG.info("filter the log block by instant range. logfile: + " +
logFile
Review Comment:
Could you make the newly added logging entries at debug level?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -361,6 +366,19 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
}
}
+ /**
+ * Check whether the current instants are valid
+ *
+ * @param instantTime
+ * @return
+ */
+ private boolean checkIfValidCommit(String instantTime) {
+ HoodieTimeline deltaCommitTimeline =
this.hoodieTableMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+ return deltaCommitTimeline.containsInstant(instantTime)
+ || (deltaCommitTimeline.isBeforeTimelineStarts(instantTime)
+ &&
this.hoodieTableMetaClient.getArchivedTimeline().getDeltaCommitTimeline().filterCompletedInstants().containsOrBeforeTimelineStarts(instantTime));
Review Comment:
For the case wrt rollback described in HUDI-3644, there should be a
ROLLBACK_PREVIOUS_BLOCK that invalidates the previous delta commit log block.
Could you verify if the fix is actually needed? It would be good to write a
reproducible test to demonstrate the issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]