nsivabalan commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r956600281


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {

Review Comment:
   existing scan method is synchronized btw. Can we make this method as well 
synchronized.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {

Review Comment:
   can we add factory and introduce diff readers for different versions. so 
that we can overload scan() method alone. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -170,6 +180,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, 
String basePath, List<Str
     this.forceFullScan = forceFullScan;
     this.internalSchema = internalSchema == null ? 
InternalSchema.getEmptyInternalSchema() : internalSchema;
     this.path = basePath;
+    this.useScanV2 = useScanV2;

Review Comment:
   +1



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+                blockTimeToCompactionBlockTimeMap.put(originalInstant, 
finalInstant);
+              });
+        } else {
+          // When a data block is found check if it is already compacted.
+          String compactedFinalInstantTime = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+          if (compactedFinalInstantTime == null) {
+            // If it is not compacted then add the blocks related to the 
instant time at the end of the queue and continue.
+            instantToBlocksMap.get(instantTime).forEach(block -> 
currentInstantLogBlocks.addLast(block));
+            instantTimesIncluded.add(instantTime);
+            validBlockInstants.add(instantTime);
+            continue;
+          }
+          // If the compacted block exists and it is already included in the 
dequeue then ignore and continue.
+          if (instantTimesIncluded.contains(compactedFinalInstantTime)) {
+            continue;
+          }
+          // If the compacted block exists and it is not already added then 
add all the blocks related to that instant time.
+          instantToBlocksMap.get(compactedFinalInstantTime).forEach(block -> 
currentInstantLogBlocks.addLast(block));

Review Comment:
   same here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+                blockTimeToCompactionBlockTimeMap.put(originalInstant, 
finalInstant);
+              });
+        } else {
+          // When a data block is found check if it is already compacted.
+          String compactedFinalInstantTime = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+          if (compactedFinalInstantTime == null) {
+            // If it is not compacted then add the blocks related to the 
instant time at the end of the queue and continue.
+            instantToBlocksMap.get(instantTime).forEach(block -> 
currentInstantLogBlocks.addLast(block));

Review Comment:
   we can use `instantsBlocks`
   ```
   instantsBlocks.forEach(block -> currentInstantLogBlocks.addLast(block));
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {

Review Comment:
   btw, how is the new scan invoked? is it configurable? for eg, even if 
someone has not enabled log compaction, do we go w/ scanV2? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {

Review Comment:
   wanna be conservative here. 
   If we plan to go w/ scanV2 even for regular flows (even if not for log 
compaction enabled), can we make this configurable. If at all we encounter any 
bugs in this scanV2, atleast we have a way to unblock reads. this entire log 
record reader has been there from the beginning. There could be some corner 
cases that we may not rationalize right away. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);

Review Comment:
   if its null, probably we should set the value to `instantTime`. if not, we 
should set it to finalInstant. maybe you can use getOrDefault here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+                blockTimeToCompactionBlockTimeMap.put(originalInstant, 
finalInstant);
+              });
+        } else {
+          // When a data block is found check if it is already compacted.
+          String compactedFinalInstantTime = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+          if (compactedFinalInstantTime == null) {
+            // If it is not compacted then add the blocks related to the 
instant time at the end of the queue and continue.
+            instantToBlocksMap.get(instantTime).forEach(block -> 
currentInstantLogBlocks.addLast(block));

Review Comment:
   regarding L560. 
   when we have multiple blocks for a given instantTime, shouldn't we be adding 
those blocks in reverse order. 
   For eg,
   Lets say, for C1 we have LB_1, LB_2, LB_3. and for C2, we have LB_4.
   
   So, instantToBlocksMap will have 2 entries. 
   when we process C1, value is going to be LB_1, LB-2, LB_3 right? So, we 
can't do foreach here. we have to add LB_3, followed by LB_2, followed by LB_1. 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);

Review Comment:
   as per above example, when we encounter i3, 
`blockTimeToCompactionBlockTimeMap.get(instantTime);` will return null right. 
can you help me understand what am I missing here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to