prashantwason commented on code in PR #8523:
URL: https://github.com/apache/hudi/pull/8523#discussion_r1178653357


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -306,45 +306,43 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
             // written per ingestion batch for a file but in reality we need 
to rollback (B1 & B2)
             // The following code ensures the same rollback block (R1) is used 
to rollback
             // both B1 & B2
-            LOG.info("Reading a command block from file " + logFile.getPath());
             // This is a command block - take appropriate action based on the 
command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
             String targetInstantForCommandBlock =
                 
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
+            LOG.info(String.format("Reading a command block %s with 
targetInstantTime %s from file %s", commandBlock.getType(), 
targetInstantForCommandBlock,
+                logFile.getPath()));
             switch (commandBlock.getType()) { // there can be different types 
of command blocks
               case ROLLBACK_BLOCK:
-                // Rollback the last read log block
-                // Get commit time from last record block, compare with 
targetCommitTime,
+                // Rollback older read log block(s)
+                // Get commit time from older record blocks, compare with 
targetCommitTime,
                 // rollback only if equal, this is required in scenarios of 
invalid/extra
                 // rollback blocks written due to failures during the rollback 
operation itself
                 // and ensures the same rollback block (R1) is used to 
rollback both B1 & B2 with
-                // same instant_time
-                int numBlocksRolledBack = 0;
-                totalRollbacks.incrementAndGet();
-                while (!currentInstantLogBlocks.isEmpty()) {
-                  HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
+                // same instant_time.
+                final int instantLogBlockSizeBeforeRollback = 
currentInstantLogBlocks.size();
+                currentInstantLogBlocks.removeIf(lastBlock -> {
                   // handle corrupt blocks separately since they may not have 
metadata
                   if (lastBlock.getBlockType() == CORRUPT_BLOCK) {

Review Comment:
   Corrected.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to