This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dd1841376fcd refactor: Clean up Spurious log block handling in
LogRecordReader (#14287)
dd1841376fcd is described below
commit dd1841376fcd6670b0fae92fa37f0b36ab3aeb38
Author: Pavithran Ravichandiran <[email protected]>
AuthorDate: Mon Nov 24 20:49:59 2025 -0800
refactor: Clean up Spurious log block handling in LogRecordReader (#14287)
---------
Co-authored-by: Pavithran Ravichandiran
<[email protected]>
---
.../table/log/BaseHoodieLogRecordReader.java | 152 +--------------------
1 file changed, 2 insertions(+), 150 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index b471f5089369..ac0a82334f2e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -34,7 +34,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -58,9 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
@@ -207,8 +204,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
private void scanInternalV1(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
- List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
- Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
blockSequenceMapPerCommit = new HashMap<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
@@ -231,14 +226,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
// Use the HoodieLogFileReader to iterate through the blocks in the
log file
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime =
logBlock.getLogBlockHeader().get(INSTANT_TIME);
- final String blockSequenceNumberStr =
logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, "");
- int blockSeqNo = -1;
- long attemptNo = -1L;
- if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) {
- String[] parts = blockSequenceNumberStr.split(",");
- attemptNo = Long.parseLong(parts[0]);
- blockSeqNo = Integer.parseInt(parts[1]);
- }
totalLogBlocks.incrementAndGet();
if (logBlock.isDataOrDeleteBlock()) {
if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) &&
!allowInflightInstants) {
@@ -266,17 +253,11 @@ public abstract class BaseHoodieLogRecordReader<T> {
LOG.info("Reading a data block from file {} at instant {}",
logFile.getPath(), instantTime);
// store the current block
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo,
attemptNo,
- blockSequenceMapPerCommit);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file {}", logFile.getPath());
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo,
attemptNo,
- blockSequenceMapPerCommit);
break;
case COMMAND_BLOCK:
// Consider the following scenario
@@ -321,26 +302,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
return false;
});
- // remove entire entry from blockSequenceTracker
- blockSequenceMapPerCommit.remove(targetInstantForCommandBlock);
-
- /// remove all matching log blocks from valid list tracked so
far
- validLogBlockInstants =
validLogBlockInstants.stream().filter(block -> {
- // handle corrupt blocks separately since they may not have
metadata
- if (block.getBlockType() == CORRUPT_BLOCK) {
- LOG.info("Rolling back the last corrupted log block read
in {}", logFile.getPath());
- return true;
- }
- if
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
{
- // rollback older data block or delete block
- LOG.info(
- "Rolling back an older log block read from {} with
instantTime {}",
- logFile.getPath(), targetInstantForCommandBlock);
- return false;
- }
- return true;
- }).collect(Collectors.toList());
-
final int numBlocksRolledBack =
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
totalRollbacks.addAndGet(numBlocksRolledBack);
LOG.info("Number of applied rollback blocks {}",
numBlocksRolledBack);
@@ -358,9 +319,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the
next data block
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- // we don't need to update the block sequence tracker here, since
the block sequence tracker is meant to remove additional/spurious valid
logblocks.
- // anyway, contents of corrupt blocks are not read.
break;
default:
throw new UnsupportedOperationException("Block type not supported
yet");
@@ -368,18 +326,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
- Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo =
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants,
blockSequenceMapPerCommit);
- if (dedupedLogBlocksInfo.getKey()) {
- // if there are duplicate log blocks that needs to be removed, we
re-create the queue for valid log blocks from dedupedLogBlocks
- currentInstantLogBlocks = new ArrayDeque<>();
- dedupedLogBlocksInfo.getValue().forEach(block ->
currentInstantLogBlocks.push(block));
- LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- } else {
- // if there are no dups, we can take currentInstantLogBlocks as is.
- LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
+ LOG.info("Merging the final data blocks");
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
}
// Done
@@ -403,102 +351,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
}
}
- /**
- * There could be spurious log blocks due to spark task retries. So, we will
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log
blocks and return
- * a deduped set of log blocks.
- *
- * @param allValidLogBlocks all valid log blocks parsed so far.
- * @param blockSequenceMapPerCommit map containing block sequence numbers
for every commit.
- * @return a Pair of boolean and list of deduped valid block blocks, where
boolean of true means, there have been dups detected.
- */
- private Pair<Boolean, List<HoodieLogBlock>>
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
-
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
blockSequenceMapPerCommit) {
-
- boolean dupsFound =
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList ->
perCommitBlockList.size() > 1);
- if (dupsFound) {
- // duplicates are found. we need to remove duplicate log blocks.
- for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
entry : blockSequenceMapPerCommit.entrySet()) {
- Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences
= entry.getValue();
- if (perCommitBlockSequences.size() > 1) {
- // only those that have more than 1 sequence needs deduping.
- int maxSequenceCount = -1;
- int maxAttemptNo = -1;
- int totalSequences = perCommitBlockSequences.size();
- int counter = 0;
- for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>>
perAttemptEntries : perCommitBlockSequences.entrySet()) {
- Long attemptNo = perAttemptEntries.getKey();
- int size = perAttemptEntries.getValue().size();
- if (maxSequenceCount < size) {
- maxSequenceCount = size;
- maxAttemptNo = Math.toIntExact(attemptNo);
- }
- counter++;
- }
- // for other sequence (!= maxSequenceIndex), we need to remove the
corresponding logBlocks from allValidLogBlocks
- for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>>
perAttemptEntries : perCommitBlockSequences.entrySet()) {
- Long attemptNo = perAttemptEntries.getKey();
- if (maxAttemptNo != attemptNo) {
- List<HoodieLogBlock> logBlocksToRemove =
perCommitBlockSequences.get(attemptNo).stream().map(Pair::getValue).collect(Collectors.toList());
- logBlocksToRemove.forEach(logBlockToRemove ->
allValidLogBlocks.remove(logBlocksToRemove));
- }
- }
- }
- }
- return Pair.of(true, allValidLogBlocks);
- } else {
- return Pair.of(false, allValidLogBlocks);
- }
- }
-
- /**
- * Updates map tracking block seq no.
- * Here is the map structure.
- * Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
blockSequenceMapPerCommit
- * Key: Commit time.
- * Value: Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
- * Value refers to a Map of different attempts for the commit of interest.
List contains the block seq number and the resp HoodieLogBlock.
- * <p>
- * For eg, if there were two attempts for a file slice while writing(due to
spark task retries), here is how the map might look like
- * key: commit1
- * value : {
- * 0L = List = { {0, lb1}, {1, lb2} },
- * 1L = List = { {0, lb3}, {1, lb4}, {2, lb5}}
- * }
- * Meaning: for commit1, there was two attempts with Append Handle while
writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4
and lb5 was added.
- * We keep populating this entire map and finally detect spurious log blocks
and ignore them.
- * In most cases, we might just see one set of sequence for a given commit.
- *
- * @param logBlock log block of interest to be added.
- * @param instantTime commit time of interest.
- * @param blockSeqNo block sequence number.
- * @param blockSequenceMapPerCommit map tracking per commit block sequences.
- */
- private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String
instantTime, int blockSeqNo, long attemptNo,
- Map<String, Map<Long,
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
- if (blockSeqNo != -1 && attemptNo != -1) { // update the block sequence
tracker for log blocks containing the same.
- blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new
HashMap<>());
- Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap =
blockSequenceMapPerCommit.get(instantTime);
- if (curCommitBlockMap.containsKey(attemptNo)) {
- // append to existing map entry
- curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
- } else {
- // create a new map entry
- curCommitBlockMap.put(attemptNo, new ArrayList<>());
- curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
- }
- // update the latest to block sequence tracker
- blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
- } else {
- // all of older blocks are considered valid. there should be only one
list for older commits where block sequence number is not present.
- blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new
HashMap<>());
- Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap =
blockSequenceMapPerCommit.get(instantTime);
- curCommitBlockMap.put(0L, new ArrayList<>());
- curCommitBlockMap.get(0L).add(Pair.of(blockSeqNo, logBlock));
- // update the latest to block sequence tracker
- blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
- }
- }
-
private void scanInternalV2(Option<KeySpec> keySpecOption, boolean
skipProcessingBlocks) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;