This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9940305e721036a389bdc944a00586df7ed3bf42 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Mon May 13 23:13:56 2024 -0700 [HUDI-7549] Reverting spurious log block deduction with LogRecordReader (#10922) Co-authored-by: Y Ethan Guo <[email protected]> --- .../org/apache/hudi/io/HoodieAppendHandle.java | 28 +--- .../org/apache/hudi/DummyTaskContextSupplier.java | 5 - .../hudi/client/FlinkTaskContextSupplier.java | 5 - .../java/org/apache/hudi/io/FlinkAppendHandle.java | 4 - .../client/common/JavaTaskContextSupplier.java | 6 - .../testutils/HoodieJavaClientTestHarness.java | 5 - .../hudi/client/SparkTaskContextSupplier.java | 5 - .../common/engine/LocalTaskContextSupplier.java | 5 - .../hudi/common/engine/TaskContextSupplier.java | 5 - .../table/log/AbstractHoodieLogRecordReader.java | 172 +-------------------- .../common/functional/TestHoodieLogFormat.java | 113 -------------- 11 files changed, 6 insertions(+), 347 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index ce4a4a46506..6ee5af67747 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -56,7 +56,6 @@ import org.apache.hudi.exception.HoodieAppendException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; -import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -132,11 +131,6 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O private boolean useWriterSchema = false; private Properties recordProperties = new Properties(); - // Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries. - // It should always start with 0 for a given file slice. for roll overs and delete blocks, we increment by 1. - private int blockSequenceNumber = 0; - // On task failures, a given task could be retried. So, this attempt number will track the number of attempts. - private int attemptNumber = 0; /** * This is used by log compaction only. @@ -148,7 +142,6 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O this.useWriterSchema = true; this.isLogCompaction = true; this.header.putAll(header); - this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get(); } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, @@ -158,7 +151,6 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O this.sizeEstimator = new DefaultSizeEstimator(); this.statuses = new ArrayList<>(); this.recordProperties.putAll(config.getProps()); - this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get(); } public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, @@ -455,13 +447,11 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O ? HoodieRecord.RECORD_KEY_METADATA_FIELD : hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); - blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config, - addBlockIdentifier()), keyField)); + blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, header, keyField)); } if (appendDeleteBlocks && recordsToDelete.size() > 0) { - blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config, - addBlockIdentifier()))); + blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), header)); } if (blocks.size() > 0) { @@ -558,10 +548,6 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O return true; } - protected boolean addBlockIdentifier() { - return true; - } - private void writeToBuffer(HoodieRecord<T> record) { if (!partitionPath.equals(record.getPartitionPath())) { HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " @@ -635,16 +621,6 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O } } - private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber, - HoodieWriteConfig config, boolean addBlockIdentifier) { - Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(); - updatedHeader.putAll(header); - if (addBlockIdentifier && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block sequence numbers only for data table. - updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber)); - } - return updatedHeader; - } - private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig, HoodieLogBlock.HoodieLogBlockType logDataBlockFormat, List<HoodieRecord> records, diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java index d87b6147302..d2c07e35509 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java @@ -45,9 +45,4 @@ public class DummyTaskContextSupplier extends TaskContextSupplier { public Option<String> getProperty(EngineProperty prop) { return null; } - - @Override - public Supplier<Integer> getAttemptNumberSupplier() { - return null; - } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java index 03c835c5553..aab248fc3cf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java @@ -62,9 +62,4 @@ public class FlinkTaskContextSupplier extends TaskContextSupplier { return Option.empty(); } - @Override - public Supplier<Integer> getAttemptNumberSupplier() { - return () -> -1; - } - } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 918fdcdb9eb..e1a030c97af 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -99,10 +99,6 @@ public class FlinkAppendHandle<T, I, K, O> && hoodieRecord.getCurrentLocation().getInstantTime().equals("U"); } - protected boolean addBlockIdentifier() { - return false; - } - @Override public List<WriteStatus> close() { try { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java index b40419a8015..628201ccc25 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java @@ -44,10 +44,4 @@ public class JavaTaskContextSupplier extends TaskContextSupplier { public Option<String> getProperty(EngineProperty prop) { return Option.empty(); } - - @Override - public Supplier<Integer> getAttemptNumberSupplier() { - return () -> 0; - } - } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index 24e7c8ebba4..da8404a66f0 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -180,11 +180,6 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest public Option<String> getProperty(EngineProperty prop) { return Option.empty(); } - - @Override - public Supplier<Integer> getAttemptNumberSupplier() { - return () -> (int)attemptId; - } } protected void initFileSystem(String basePath, StorageConfiguration<?> hadoopConf) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java index 7cfa411511a..5b299d2e291 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java @@ -50,11 +50,6 @@ public class SparkTaskContextSupplier extends TaskContextSupplier implements Ser return () -> TaskContext.get().taskAttemptId(); } - @Override - public Supplier<Integer> getAttemptNumberSupplier() { - return () -> TaskContext.get().attemptNumber(); - } - @Override public Option<String> getProperty(EngineProperty prop) { if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java index bff42692340..6b853b566e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java @@ -46,9 +46,4 @@ public final class LocalTaskContextSupplier extends TaskContextSupplier { return Option.empty(); } - @Override - public Supplier<Integer> getAttemptNumberSupplier() { - return () -> 0; - } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java index 24a6d0e527a..813236c07a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java @@ -35,9 +35,4 @@ public abstract class TaskContextSupplier implements Serializable { public abstract Supplier<Long> getAttemptIdSupplier(); public abstract Option<String> getProperty(EngineProperty prop); - - /** - * @returns the attempt number for the task of interest. Attempt starts with 0 and goes up by 1 on retries. - */ - public abstract Supplier<Integer> getAttemptNumberSupplier(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 2800b134ca3..66d96e8bfea 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -34,7 +34,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; @@ -66,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER; +import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME; @@ -225,8 +224,6 @@ public abstract class AbstractHoodieLogRecordReader { private void scanInternalV1(Option<KeySpec> keySpecOpt) { currentInstantLogBlocks = new ArrayDeque<>(); - List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>(); - Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit = new HashMap<>(); AtomicBoolean blockIdentifiersPresent = new AtomicBoolean(false); progress = 0.0f; @@ -256,14 +253,6 @@ public abstract class AbstractHoodieLogRecordReader { // Use the HoodieLogFileReader to iterate through the blocks in the log file HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); - final String blockIdentifier = logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, StringUtils.EMPTY_STRING); - int blockSeqNumber = -1; - long attemptNumber = -1L; - if (!StringUtils.isNullOrEmpty(blockIdentifier)) { - String[] parts = blockIdentifier.split(","); - attemptNumber = Long.parseLong(parts[0]); - blockSeqNumber = Integer.parseInt(parts[1]); - } totalLogBlocks.incrementAndGet(); if (logBlock.getBlockType() != CORRUPT_BLOCK && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime @@ -289,15 +278,11 @@ public abstract class AbstractHoodieLogRecordReader { LOG.info("Reading a data block from file {} at instant {}", logFile.getPath(), instantTime); // store the current block currentInstantLogBlocks.push(logBlock); - validLogBlockInstants.add(logBlock); - updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent); break; case DELETE_BLOCK: LOG.info("Reading a delete block from file {}", logFile.getPath()); // store deletes so can be rolled back currentInstantLogBlocks.push(logBlock); - validLogBlockInstants.add(logBlock); - updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent); break; case COMMAND_BLOCK: // Consider the following scenario @@ -339,25 +324,6 @@ public abstract class AbstractHoodieLogRecordReader { } return false; }); - - // remove entire entry from blockSequenceTracker - blockSequenceMapPerCommit.remove(targetInstantForCommandBlock); - - /// remove all matching log blocks from valid list tracked so far - validLogBlockInstants = validLogBlockInstants.stream().filter(block -> { - // handle corrupt blocks separately since they may not have metadata - if (block.getBlockType() == CORRUPT_BLOCK) { - LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath()); - return true; - } - if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) { - // rollback older data block or delete block - LOG.info("Rolling back an older log block read from {} with instantTime {}", logFile.getPath(), targetInstantForCommandBlock); - return false; - } - return true; - }).collect(Collectors.toList()); - final int numBlocksRolledBack = instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size(); totalRollbacks.addAndGet(numBlocksRolledBack); LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack); @@ -374,9 +340,6 @@ public abstract class AbstractHoodieLogRecordReader { totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block currentInstantLogBlocks.push(logBlock); - validLogBlockInstants.add(logBlock); - // we don't need to update the block sequence tracker here, since the block sequence tracker is meant to remove additional/spurious valid logblocks. - // anyway, contents of corrupt blocks are not read. break; default: throw new UnsupportedOperationException("Block type not supported yet"); @@ -384,23 +347,9 @@ public abstract class AbstractHoodieLogRecordReader { } // merge the last read block when all the blocks are done reading if (!currentInstantLogBlocks.isEmpty()) { - boolean duplicateBlocksDetected = false; - if (blockIdentifiersPresent.get()) { - Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit); - duplicateBlocksDetected = dedupedLogBlocksInfo.getKey(); - if (duplicateBlocksDetected) { - // if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks - currentInstantLogBlocks = new ArrayDeque<>(); - dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block)); - LOG.info("Merging the final data blocks"); - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); - } - } - if (!duplicateBlocksDetected) { - // if there are no dups, we can take currentInstantLogBlocks as is. - LOG.info("Merging the final data blocks"); - processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); - } + // if there are no dups, we can take currentInstantLogBlocks as is. + LOG.info("Merging the final data blocks"); + processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt); } // Done @@ -423,119 +372,6 @@ public abstract class AbstractHoodieLogRecordReader { } } - /** - * There could be spurious log blocks due to spark task retries. So, we will use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log blocks and return - * a deduped set of log blocks. - * @param allValidLogBlocks all valid log blocks parsed so far. - * @param blockSequenceMapPerCommit map containing block sequence numbers for every commit. - * @return a Pair of boolean and list of deduped valid block blocks, where boolean of true means, there have been dups detected. - */ - private Pair<Boolean, List<HoodieLogBlock>> reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks, - Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) { - - boolean dupsFound = blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> perCommitBlockList.size() > 1); - if (dupsFound) { - if (LOG.isDebugEnabled()) { - logBlockSequenceMapping(blockSequenceMapPerCommit); - } - - // duplicates are found. we need to remove duplicate log blocks. - for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry: blockSequenceMapPerCommit.entrySet()) { - Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences = entry.getValue(); - if (perCommitBlockSequences.size() > 1) { - // only those that have more than 1 sequence needs deduping. - int maxSequenceCount = -1; - int maxAttemptNo = -1; - for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) { - Long attemptNo = perAttemptEntries.getKey(); - int size = perAttemptEntries.getValue().size(); - if (maxSequenceCount <= size) { - maxSequenceCount = size; - maxAttemptNo = Math.toIntExact(attemptNo); - } - } - // for other sequences (!= maxSequenceIndex), we need to remove the corresponding logBlocks from allValidLogBlocks - for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) { - Long attemptNo = perAttemptEntries.getKey(); - if (maxAttemptNo != attemptNo) { - List<HoodieLogBlock> logBlocksToRemove = perCommitBlockSequences.get(attemptNo).stream().map(Pair::getValue).collect(Collectors.toList()); - logBlocksToRemove.forEach(logBlockToRemove -> allValidLogBlocks.remove(logBlockToRemove)); - } - } - } - } - return Pair.of(true, allValidLogBlocks); - } else { - return Pair.of(false, allValidLogBlocks); - } - } - - private void logBlockSequenceMapping(Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) { - LOG.warn("Duplicate log blocks found "); - for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry : blockSequenceMapPerCommit.entrySet()) { - if (entry.getValue().size() > 1) { - LOG.warn("\tCommit time {}", entry.getKey()); - Map<Long, List<Pair<Integer, HoodieLogBlock>>> value = entry.getValue(); - for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq : value.entrySet()) { - LOG.warn("\t\tAttempt number {}", attemptsSeq.getKey()); - attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog block sequence no : {}, log file {}", - entryValue.getKey(), entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString())); - } - } - } - } - - /** - * Updates map tracking block seq no. - * Here is the map structure. - * Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit - * Key: Commit time. - * Value: Map<Long, List<Pair<Integer, HoodieLogBlock>>>> - * Value refers to a Map of different attempts for the commit of interest. List contains the block seq number and the resp HoodieLogBlock. - * - * For eg, if there were two attempts for a file slice while writing(due to spark task retries), here is how the map might look like - * key: commit1 - * value : { - * 0L = List = { {0, lb1}, {1, lb2} }, - * 1L = List = { {0, lb3}, {1, lb4}, {2, lb5}} - * } - * Meaning: for commit1, there was two attempts with Append Handle while writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4 and lb5 was added. - * We keep populating this entire map and finally detect spurious log blocks and ignore them. - * In most cases, we might just see one set of sequence for a given commit. - * - * @param logBlock log block of interest to be added. - * @param instantTime commit time of interest. - * @param blockSeqNumber block sequence number. - * @param blockSequenceMapPerCommit map tracking per commit block sequences. - */ - private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String instantTime, int blockSeqNumber, long attemptNumber, - Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit, - AtomicBoolean blockIdentifiersPresent) { - if (blockSeqNumber != -1 && attemptNumber != -1) { // update the block sequence tracker for log blocks containing the same. - blockIdentifiersPresent.set(true); - blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>()); - Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime); - if (curCommitBlockMap.containsKey(attemptNumber)) { - // append to existing map entry - curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, logBlock)); - } else { - // create a new map entry - curCommitBlockMap.put(attemptNumber, new ArrayList<>()); - curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, logBlock)); - } - // update the latest to block sequence tracker - blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap); - } else { - // all of older blocks are considered valid. there should be only one list for older commits where block sequence number is not present. - blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>()); - Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime); - curCommitBlockMap.computeIfAbsent(0L, entry -> new ArrayList<>()); - curCommitBlockMap.get(0L).add(Pair.of(blockSeqNumber, logBlock)); - // update the latest to block sequence tracker - blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap); - } - } - private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) { currentInstantLogBlocks = new ArrayDeque<>(); progress = 0.0f; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 7b884ca70cf..db3c0e9354d 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -112,7 +112,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.stream.Collectors.toList; import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion; import static org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs; import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs; @@ -685,108 +684,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { scanner.close(); } - @Test - public void testBasicAppendsWithBlockSeqNos() throws IOException, URISyntaxException, InterruptedException { - testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { - return writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); - }); - } - - @Test - public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, URISyntaxException, InterruptedException { - testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { - Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); - // re add the same records again - logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos)); - return logFiles; - }); - } - - @Test - public void testAppendsWithSpruiousLogBlocksFirstAttemptPartial() throws IOException, URISyntaxException, InterruptedException { - testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { - Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); - // removing 4th log block to simulate partial failure in 1st attempt - List<HoodieLogFile> logFileList = new ArrayList<>(logFiles); - logFiles.remove(logFileList.get(logFileList.size() - 1)); - // re add the same records again - logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos)); - return logFiles; - }); - } - - @Test - public void testAppendsWithSpruiousLogBlocksSecondAttemptPartial() throws IOException, URISyntaxException, InterruptedException { - testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, numFiles, enableBlockSeqNos) -> { - Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); - // re add the same records again - Set<HoodieLogFile> logFilesSet2 = writeLogFiles(partitionPath, schema, genRecords, numFiles, enableBlockSeqNos); - // removing 4th log block to simular partial failure in 2nd attempt - List<HoodieLogFile> logFileList2 = new ArrayList<>(logFilesSet2); - logFilesSet2.remove(logFileList2.get(logFileList2.size() - 1)); - logFiles.addAll(logFilesSet2); - return logFiles; - }); - } - - private void testAppendsWithSpruiousLogBlocks( - boolean enableOptimizedLogBlocksScan, - Function5<Set<HoodieLogFile>, StoragePath, Schema, List<IndexedRecord>, Integer, - Boolean> logGenFunc) - throws IOException, URISyntaxException, InterruptedException { - - Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); - SchemaTestUtil testUtil = new SchemaTestUtil(); - List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 400); - Set<HoodieLogFile> logFiles = logGenFunc.apply(partitionPath, schema, genRecords, 4, true); - - FileCreateUtils.createDeltaCommit(basePath, "100", storage); - - HoodieMergedLogRecordScanner scanner = getLogRecordScanner(logFiles, schema, enableOptimizedLogBlocksScan); - // even though we have duplicates records, due to block sequence reconcile, only one set of blocks should be parsed as valid - assertRecordsAndCloseScanner(scanner, genRecords, schema); - } - - private void assertRecordsAndCloseScanner(HoodieMergedLogRecordScanner scanner, List<IndexedRecord> genRecords, Schema schema) throws IOException { - List<IndexedRecord> scannedRecords = new ArrayList<>(); - for (HoodieRecord record : scanner) { - scannedRecords.add((IndexedRecord) - ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); - } - - assertEquals(sort(genRecords), sort(scannedRecords), - "Scanner records count should be the same as appended records"); - scanner.close(); - } - - private HoodieMergedLogRecordScanner getLogRecordScanner(Set<HoodieLogFile> logFiles, Schema schema, - boolean enableOptimizedLogBlocksScan) { - - // scan all log blocks (across multiple log files) - return HoodieMergedLogRecordScanner.newBuilder() - .withStorage(storage) - .withBasePath(basePath) - .withLogFilePaths( - logFiles.stream().sorted(HoodieLogFile.getLogFileComparator()) - .map(l -> l.getPath().toString()).collect(toList())) - .withReaderSchema(schema) - .withLatestInstantTime("100") - .withMaxMemorySizeInBytes(10240L) - .withReverseReader(false) - .withBufferSize(BUFFER_SIZE) - .withSpillableMapBasePath(spillableBasePath) - .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) - .withBitCaskDiskMapCompressionEnabled(true) - .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) - .build(); - } - - @FunctionalInterface - public interface Function5<R, T1, T2, T3, T4, T5> { - - R apply(T1 v1, T2 v2, T3 v3, T4 v4, T5 v5) throws IOException, InterruptedException; - } - @ParameterizedTest @MethodSource("testArguments") public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType, @@ -2861,9 +2758,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { List<IndexedRecord> targetRecords = records.subList(offset, offset + targetRecordsCount); logFiles.add(writer.getLogFile()); - if (enableBlockSequenceNumbers) { - header = getUpdatedHeader(header, blockSeqNo++); - } writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, header)); filesWritten++; } @@ -2873,13 +2767,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { return logFiles; } - private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber) { - Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(); - updatedHeader.putAll(header); - updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, String.valueOf(blockSequenceNumber)); - return updatedHeader; - } - /** * Utility to convert the given iterator to a List. */
