yihua commented on code in PR #10922:
URL: https://github.com/apache/hudi/pull/10922#discussion_r1538065567
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -281,15 +269,11 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
LOG.info("Reading a data block from file " + logFile.getPath() + "
at instant " + instantTime);
// store the current block
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
Review Comment:
Previously there was the logic below before
`currentInstantLogBlocks.push(logBlock);` (see #9545):
```
LOG.info("Reading a data block from file " + logFile.getPath() + " at
instant "
+ logBlock.getLogBlockHeader().get(INSTANT_TIME));
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different
commit/instant,
// then merge the last blocks and records into the main result
processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
}
```
Is that still needed for branch-0.x? Could you also raise a PR against
`branch-0.x` if the logic is different between 1.x and 0.x release line?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -370,33 +335,16 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the
next data block
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- // we don't need to update the block sequence tracker here, since
the block sequence tracker is meant to remove additional/spurious valid
logblocks.
- // anyway, contents of corrupt blocks are not read.
break;
default:
throw new UnsupportedOperationException("Block type not supported
yet");
}
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
- boolean duplicateBlocksDetected = false;
- if (blockIdentifiersPresent.get()) {
- Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo =
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants,
blockSequenceMapPerCommit);
- duplicateBlocksDetected = dedupedLogBlocksInfo.getKey();
- if (duplicateBlocksDetected) {
- // if there are duplicate log blocks that needs to be removed, we
re-create the queue for valid log blocks from dedupedLogBlocks
- currentInstantLogBlocks = new ArrayDeque<>();
- dedupedLogBlocksInfo.getValue().forEach(block ->
currentInstantLogBlocks.push(block));
- LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
- }
- if (!duplicateBlocksDetected) {
- // if there are no dups, we can take currentInstantLogBlocks as is.
- LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
+ // if there are no dups, we can take currentInstantLogBlocks as is.
+ LOG.info("Merging the final data blocks");
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
Review Comment:
Changes around `HoodieAppendHandle#addBlockIdentifier`,
`TaskContextSupplier#getAttemptNumberSupplier` introduced by #9611 should also
be reverted.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -281,15 +269,11 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
LOG.info("Reading a data block from file " + logFile.getPath() + "
at instant " + instantTime);
// store the current block
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber,
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file " + logFile.getPath());
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
Review Comment:
Similar here for delete block processing (see #9545)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]