yihua commented on code in PR #10922:
URL: https://github.com/apache/hudi/pull/10922#discussion_r1538065567


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -281,15 +269,11 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
             LOG.info("Reading a data block from file " + logFile.getPath() + " 
at instant " + instantTime);
             // store the current block
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);

Review Comment:
   Previously there was the logic below before 
`currentInstantLogBlocks.push(logBlock);` (see #9545):
   ```
   LOG.info("Reading a data block from file " + logFile.getPath() + " at 
instant "
                   + logBlock.getLogBlockHeader().get(INSTANT_TIME));
               if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
                 // If this is an avro data block belonging to a different 
commit/instant,
                 // then merge the last blocks and records into the main result
                 processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
               }
   ```
   Is that still needed for branch-0.x?  Could you also raise a PR against 
`branch-0.x` if the logic is different between 1.x and 0.x release line?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -370,33 +335,16 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the 
next data block
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            // we don't need to update the block sequence tracker here, since 
the block sequence tracker is meant to remove additional/spurious valid 
logblocks.
-            // anyway, contents of corrupt blocks are not read.
             break;
           default:
             throw new UnsupportedOperationException("Block type not supported 
yet");
         }
       }
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty()) {
-        boolean duplicateBlocksDetected = false;
-        if (blockIdentifiersPresent.get()) {
-          Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = 
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, 
blockSequenceMapPerCommit);
-          duplicateBlocksDetected = dedupedLogBlocksInfo.getKey();
-          if (duplicateBlocksDetected) {
-            // if there are duplicate log blocks that needs to be removed, we 
re-create the queue for valid log blocks from dedupedLogBlocks
-            currentInstantLogBlocks = new ArrayDeque<>();
-            dedupedLogBlocksInfo.getValue().forEach(block -> 
currentInstantLogBlocks.push(block));
-            LOG.info("Merging the final data blocks");
-            processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-          }
-        }
-        if (!duplicateBlocksDetected) {
-          // if there are no dups, we can take currentInstantLogBlocks as is.
-          LOG.info("Merging the final data blocks");
-          processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-        }
+        // if there are no dups, we can take currentInstantLogBlocks as is.
+        LOG.info("Merging the final data blocks");
+        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);

Review Comment:
   Changes around `HoodieAppendHandle#addBlockIdentifier`, 
`TaskContextSupplier#getAttemptNumberSupplier` introduced by #9611 should also 
be reverted.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -281,15 +269,11 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
             LOG.info("Reading a data block from file " + logFile.getPath() + " 
at instant " + instantTime);
             // store the current block
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, 
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
             break;
           case DELETE_BLOCK:
             LOG.info("Reading a delete block from file " + logFile.getPath());
             // store deletes so can be rolled back
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);

Review Comment:
   Similar here for delete block processing (see #9545)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to