This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dd1841376fcd refactor: Clean up Spurious log block handling in 
LogRecordReader (#14287)
dd1841376fcd is described below

commit dd1841376fcd6670b0fae92fa37f0b36ab3aeb38
Author: Pavithran Ravichandiran <[email protected]>
AuthorDate: Mon Nov 24 20:49:59 2025 -0800

    refactor: Clean up Spurious log block handling in LogRecordReader (#14287)
    
    
    ---------
    
    Co-authored-by: Pavithran Ravichandiran 
<[email protected]>
---
 .../table/log/BaseHoodieLogRecordReader.java       | 152 +--------------------
 1 file changed, 2 insertions(+), 150 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index b471f5089369..ac0a82334f2e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -34,7 +34,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -58,9 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
@@ -207,8 +204,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
 
   private void scanInternalV1(Option<KeySpec> keySpecOpt) {
     currentInstantLogBlocks = new ArrayDeque<>();
-    List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
-    Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit = new HashMap<>();
 
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
@@ -231,14 +226,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
-        final String blockSequenceNumberStr = 
logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, "");
-        int blockSeqNo = -1;
-        long attemptNo = -1L;
-        if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) {
-          String[] parts = blockSequenceNumberStr.split(",");
-          attemptNo = Long.parseLong(parts[0]);
-          blockSeqNo = Integer.parseInt(parts[1]);
-        }
         totalLogBlocks.incrementAndGet();
         if (logBlock.isDataOrDeleteBlock()) {
           if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && 
!allowInflightInstants) {
@@ -266,17 +253,11 @@ public abstract class BaseHoodieLogRecordReader<T> {
             LOG.info("Reading a data block from file {} at instant {}", 
logFile.getPath(), instantTime);
             // store the current block
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, 
attemptNo,
-                blockSequenceMapPerCommit);
             break;
           case DELETE_BLOCK:
             LOG.info("Reading a delete block from file {}", logFile.getPath());
             // store deletes so can be rolled back
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, 
attemptNo,
-                blockSequenceMapPerCommit);
             break;
           case COMMAND_BLOCK:
             // Consider the following scenario
@@ -321,26 +302,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
                   return false;
                 });
 
-                // remove entire entry from blockSequenceTracker
-                blockSequenceMapPerCommit.remove(targetInstantForCommandBlock);
-
-                /// remove all matching log blocks from valid list tracked so 
far
-                validLogBlockInstants = 
validLogBlockInstants.stream().filter(block -> {
-                  // handle corrupt blocks separately since they may not have 
metadata
-                  if (block.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read 
in {}", logFile.getPath());
-                    return true;
-                  }
-                  if 
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
 {
-                    // rollback older data block or delete block
-                    LOG.info(
-                        "Rolling back an older log block read from {} with 
instantTime {}",
-                        logFile.getPath(), targetInstantForCommandBlock);
-                    return false;
-                  }
-                  return true;
-                }).collect(Collectors.toList());
-
                 final int numBlocksRolledBack = 
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
                 totalRollbacks.addAndGet(numBlocksRolledBack);
                 LOG.info("Number of applied rollback blocks {}", 
numBlocksRolledBack);
@@ -358,9 +319,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the 
next data block
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            // we don't need to update the block sequence tracker here, since 
the block sequence tracker is meant to remove additional/spurious valid 
logblocks.
-            // anyway, contents of corrupt blocks are not read.
             break;
           default:
             throw new UnsupportedOperationException("Block type not supported 
yet");
@@ -368,18 +326,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
       }
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty()) {
-        Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = 
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, 
blockSequenceMapPerCommit);
-        if (dedupedLogBlocksInfo.getKey()) {
-          // if there are duplicate log blocks that needs to be removed, we 
re-create the queue for valid log blocks from dedupedLogBlocks
-          currentInstantLogBlocks = new ArrayDeque<>();
-          dedupedLogBlocksInfo.getValue().forEach(block -> 
currentInstantLogBlocks.push(block));
-          LOG.info("Merging the final data blocks");
-          processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-        } else {
-          // if there are no dups, we can take currentInstantLogBlocks as is.
-          LOG.info("Merging the final data blocks");
-          processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-        }
+        LOG.info("Merging the final data blocks");
+        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
       }
 
       // Done
@@ -403,102 +351,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
     }
   }
 
-  /**
-   * There could be spurious log blocks due to spark task retries. So, we will 
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log 
blocks and return
-   * a deduped set of log blocks.
-   *
-   * @param allValidLogBlocks         all valid log blocks parsed so far.
-   * @param blockSequenceMapPerCommit map containing block sequence numbers 
for every commit.
-   * @return a Pair of boolean and list of deduped valid block blocks, where 
boolean of true means, there have been dups detected.
-   */
-  private Pair<Boolean, List<HoodieLogBlock>> 
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
-                                                                               
      Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit) {
-
-    boolean dupsFound = 
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> 
perCommitBlockList.size() > 1);
-    if (dupsFound) {
-      // duplicates are found. we need to remove duplicate log blocks.
-      for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
entry : blockSequenceMapPerCommit.entrySet()) {
-        Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences 
= entry.getValue();
-        if (perCommitBlockSequences.size() > 1) {
-          // only those that have more than 1 sequence needs deduping.
-          int maxSequenceCount = -1;
-          int maxAttemptNo = -1;
-          int totalSequences = perCommitBlockSequences.size();
-          int counter = 0;
-          for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
-            Long attemptNo = perAttemptEntries.getKey();
-            int size = perAttemptEntries.getValue().size();
-            if (maxSequenceCount < size) {
-              maxSequenceCount = size;
-              maxAttemptNo = Math.toIntExact(attemptNo);
-            }
-            counter++;
-          }
-          // for other sequence (!= maxSequenceIndex), we need to remove the 
corresponding logBlocks from allValidLogBlocks
-          for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
-            Long attemptNo = perAttemptEntries.getKey();
-            if (maxAttemptNo != attemptNo) {
-              List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(attemptNo).stream().map(Pair::getValue).collect(Collectors.toList());
-              logBlocksToRemove.forEach(logBlockToRemove -> 
allValidLogBlocks.remove(logBlocksToRemove));
-            }
-          }
-        }
-      }
-      return Pair.of(true, allValidLogBlocks);
-    } else {
-      return Pair.of(false, allValidLogBlocks);
-    }
-  }
-
-  /**
-   * Updates map tracking block seq no.
-   * Here is the map structure.
-   * Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit
-   * Key: Commit time.
-   * Value: Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
-   * Value refers to a Map of different attempts for the commit of interest. 
List contains the block seq number and the resp HoodieLogBlock.
-   * <p>
-   * For eg, if there were two attempts for a file slice while writing(due to 
spark task retries), here is how the map might look like
-   * key: commit1
-   * value : {
-   * 0L = List = { {0, lb1}, {1, lb2} },
-   * 1L = List = { {0, lb3}, {1, lb4}, {2, lb5}}
-   * }
-   * Meaning: for commit1, there was two attempts with Append Handle while 
writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4 
and lb5 was added.
-   * We keep populating this entire map and finally detect spurious log blocks 
and ignore them.
-   * In most cases, we might just see one set of sequence for a given commit.
-   *
-   * @param logBlock                  log block of interest to be added.
-   * @param instantTime               commit time of interest.
-   * @param blockSeqNo                block sequence number.
-   * @param blockSequenceMapPerCommit map tracking per commit block sequences.
-   */
-  private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String 
instantTime, int blockSeqNo, long attemptNo,
-                                          Map<String, Map<Long, 
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
-    if (blockSeqNo != -1 && attemptNo != -1) { // update the block sequence 
tracker for log blocks containing the same.
-      blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
-      Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
-      if (curCommitBlockMap.containsKey(attemptNo)) {
-        // append to existing map entry
-        curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
-      } else {
-        // create a new map entry
-        curCommitBlockMap.put(attemptNo, new ArrayList<>());
-        curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
-      }
-      // update the latest to block sequence tracker
-      blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
-    } else {
-      // all of older blocks are considered valid. there should be only one 
list for older commits where block sequence number is not present.
-      blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
-      Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
-      curCommitBlockMap.put(0L, new ArrayList<>());
-      curCommitBlockMap.get(0L).add(Pair.of(blockSeqNo, logBlock));
-      // update the latest to block sequence tracker
-      blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
-    }
-  }
-
   private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;

Reply via email to