nsivabalan commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r956606125


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+                blockTimeToCompactionBlockTimeMap.put(originalInstant, 
finalInstant);
+              });
+        } else {
+          // When a data block is found check if it is already compacted.
+          String compactedFinalInstantTime = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+          if (compactedFinalInstantTime == null) {
+            // If it is not compacted then add the blocks related to the 
instant time at the end of the queue and continue.
+            instantToBlocksMap.get(instantTime).forEach(block -> 
currentInstantLogBlocks.addLast(block));
+            instantTimesIncluded.add(instantTime);
+            validBlockInstants.add(instantTime);
+            continue;
+          }
+          // If the compacted block exists and it is already included in the 
dequeue then ignore and continue.
+          if (instantTimesIncluded.contains(compactedFinalInstantTime)) {
+            continue;
+          }

Review Comment:
   L565 to 572 is bit counter intuitive. why can't we add this to within if 
block in L545.i.e when we counter a instant time which represents a log 
compaction, we can add all log blocks to validBlockInstants right there. we do 
we need to wait until we encounter one of the data bocks which was part of the 
compacted log block. can you throw some light please. 
   
   also, similar comment as above. we can't do forEach here. might have to add 
it in reverse order.
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+                blockTimeToCompactionBlockTimeMap.put(originalInstant, 
finalInstant);
+              });
+        } else {
+          // When a data block is found check if it is already compacted.
+          String compactedFinalInstantTime = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+          if (compactedFinalInstantTime == null) {
+            // If it is not compacted then add the blocks related to the 
instant time at the end of the queue and continue.
+            instantToBlocksMap.get(instantTime).forEach(block -> 
currentInstantLogBlocks.addLast(block));
+            instantTimesIncluded.add(instantTime);
+            validBlockInstants.add(instantTime);
+            continue;
+          }
+          // If the compacted block exists and it is already included in the 
dequeue then ignore and continue.
+          if (instantTimesIncluded.contains(compactedFinalInstantTime)) {
+            continue;
+          }

Review Comment:
   nvm. I got the reason. essentially if a log compacted block is again part of 
another log compaction, we do not want to include it. and hence. may be you can 
add some docs around it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to