This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 45ad35b960c [HUDI-7549] Reverting spurious log block deduction with
LogRecordReader (#10922)
45ad35b960c is described below
commit 45ad35b960cc0498caa00bd4ad7cb10fb0ad91d9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon May 13 23:13:56 2024 -0700
[HUDI-7549] Reverting spurious log block deduction with LogRecordReader
(#10922)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 25 +--
.../org/apache/hudi/DummyTaskContextSupplier.java | 5 -
.../hudi/client/FlinkTaskContextSupplier.java | 5 -
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 4 -
.../client/common/JavaTaskContextSupplier.java | 6 -
.../testutils/HoodieJavaClientTestHarness.java | 5 -
.../hudi/client/SparkTaskContextSupplier.java | 5 -
.../common/engine/LocalTaskContextSupplier.java | 5 -
.../hudi/common/engine/TaskContextSupplier.java | 5 -
.../table/log/AbstractHoodieLogRecordReader.java | 172 +--------------------
.../common/functional/TestHoodieLogFormat.java | 113 --------------
11 files changed, 8 insertions(+), 342 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3cc018010bd..fa69701af7a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -57,7 +57,6 @@ import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
@@ -130,11 +129,6 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private boolean useWriterSchema = false;
private final Properties recordProperties = new Properties();
- // Block Sequence number will be used to detect duplicate log blocks(by log
reader) added due to spark task retries.
- // It should always start with 0 for a given file slice. for rolling-over
and delete blocks, we increment by 1.
- private int blockSequenceNumber = 0;
- // On task failures, a given task could be retried. So, this attempt number
will track the number of attempts.
- private int attemptNumber = 0;
/**
* This is used by log compaction only.
@@ -146,7 +140,6 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
this.useWriterSchema = true;
this.isLogCompaction = true;
this.header.putAll(header);
- this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
@@ -163,7 +156,6 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
- this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
}
@@ -470,14 +462,12 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
:
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList,
shouldWriteRecordPositions,
- getUpdatedHeader(header, blockSequenceNumber++, attemptNumber,
config,
- addBlockIdentifier()), keyField));
+ getUpdatedHeader(header, config), keyField));
}
if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions,
shouldWriteRecordPositions,
- getUpdatedHeader(header, blockSequenceNumber++, attemptNumber,
config,
- addBlockIdentifier())));
+ getUpdatedHeader(header, config)));
}
if (blocks.size() > 0) {
@@ -576,10 +566,6 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
return true;
}
- protected boolean addBlockIdentifier() {
- return true;
- }
-
private void writeToBuffer(HoodieRecord<T> record) {
if (!partitionPath.equals(record.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched
partition path, record partition: "
@@ -654,12 +640,9 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
}
}
- private static Map<HeaderMetadataType, String>
getUpdatedHeader(Map<HeaderMetadataType, String> header, int
blockSequenceNumber, long attemptNumber,
-
HoodieWriteConfig config, boolean addBlockIdentifier) {
+ private static Map<HeaderMetadataType, String>
getUpdatedHeader(Map<HeaderMetadataType, String> header,
+
HoodieWriteConfig config) {
Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(header);
- if (addBlockIdentifier &&
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block
sequence numbers only for data table.
- updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, attemptNumber +
"," + blockSequenceNumber);
- }
if (config.shouldWritePartialUpdates()) {
// When enabling writing partial updates to the data blocks, the
"IS_PARTIAL" flag is also
// written to the block header so that the reader can differentiate
partial updates, i.e.,
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
index d87b6147302..d2c07e35509 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
@@ -45,9 +45,4 @@ public class DummyTaskContextSupplier extends
TaskContextSupplier {
public Option<String> getProperty(EngineProperty prop) {
return null;
}
-
- @Override
- public Supplier<Integer> getAttemptNumberSupplier() {
- return null;
- }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
index 03c835c5553..aab248fc3cf 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
@@ -62,9 +62,4 @@ public class FlinkTaskContextSupplier extends
TaskContextSupplier {
return Option.empty();
}
- @Override
- public Supplier<Integer> getAttemptNumberSupplier() {
- return () -> -1;
- }
-
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 9f99a1624d3..0b517b5d4ae 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -92,10 +92,6 @@ public class FlinkAppendHandle<T, I, K, O>
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}
- protected boolean addBlockIdentifier() {
- return false;
- }
-
@Override
public List<WriteStatus> close() {
try {
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
index b40419a8015..628201ccc25 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
@@ -44,10 +44,4 @@ public class JavaTaskContextSupplier extends
TaskContextSupplier {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}
-
- @Override
- public Supplier<Integer> getAttemptNumberSupplier() {
- return () -> 0;
- }
-
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 05cbd7af8e8..61429b3fef2 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -180,11 +180,6 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}
-
- @Override
- public Supplier<Integer> getAttemptNumberSupplier() {
- return () -> (int)attemptId;
- }
}
protected void initFileSystem(String basePath, StorageConfiguration<?>
hadoopConf) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
index 7cfa411511a..5b299d2e291 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
@@ -50,11 +50,6 @@ public class SparkTaskContextSupplier extends
TaskContextSupplier implements Ser
return () -> TaskContext.get().taskAttemptId();
}
- @Override
- public Supplier<Integer> getAttemptNumberSupplier() {
- return () -> TaskContext.get().attemptNumber();
- }
-
@Override
public Option<String> getProperty(EngineProperty prop) {
if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
index bff42692340..6b853b566e4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
@@ -46,9 +46,4 @@ public final class LocalTaskContextSupplier extends
TaskContextSupplier {
return Option.empty();
}
- @Override
- public Supplier<Integer> getAttemptNumberSupplier() {
- return () -> 0;
- }
-
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
index 24a6d0e527a..813236c07a8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
@@ -35,9 +35,4 @@ public abstract class TaskContextSupplier implements
Serializable {
public abstract Supplier<Long> getAttemptIdSupplier();
public abstract Option<String> getProperty(EngineProperty prop);
-
- /**
- * @returns the attempt number for the task of interest. Attempt starts with
0 and goes up by 1 on retries.
- */
- public abstract Supplier<Integer> getAttemptNumberSupplier();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 6f4e00dd053..c545640e3eb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -34,7 +34,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
@@ -66,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER;
+import static
org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
@@ -225,8 +224,6 @@ public abstract class AbstractHoodieLogRecordReader {
private void scanInternalV1(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
- List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
- Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
blockSequenceMapPerCommit = new HashMap<>();
AtomicBoolean blockIdentifiersPresent = new AtomicBoolean(false);
progress = 0.0f;
@@ -253,14 +250,6 @@ public abstract class AbstractHoodieLogRecordReader {
// Use the HoodieLogFileReader to iterate through the blocks in the
log file
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime =
logBlock.getLogBlockHeader().get(INSTANT_TIME);
- final String blockIdentifier =
logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER,
StringUtils.EMPTY_STRING);
- int blockSeqNumber = -1;
- long attemptNumber = -1L;
- if (!StringUtils.isNullOrEmpty(blockIdentifier)) {
- String[] parts = blockIdentifier.split(",");
- attemptNumber = Long.parseLong(parts[0]);
- blockSeqNumber = Integer.parseInt(parts[1]);
- }
totalLogBlocks.incrementAndGet();
if (logBlock.getBlockType() != CORRUPT_BLOCK
&&
!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
@@ -281,15 +270,11 @@ public abstract class AbstractHoodieLogRecordReader {
LOG.info("Reading a data block from file {} at instant {}",
logFile.getPath(), instantTime);
// store the current block
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber,
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file {}", logFile.getPath());
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber,
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
break;
case COMMAND_BLOCK:
// Consider the following scenario
@@ -331,25 +316,6 @@ public abstract class AbstractHoodieLogRecordReader {
}
return false;
});
-
- // remove entire entry from blockSequenceTracker
- blockSequenceMapPerCommit.remove(targetInstantForCommandBlock);
-
- /// remove all matching log blocks from valid list tracked so
far
- validLogBlockInstants =
validLogBlockInstants.stream().filter(block -> {
- // handle corrupt blocks separately since they may not have
metadata
- if (block.getBlockType() == CORRUPT_BLOCK) {
- LOG.info("Rolling back the last corrupted log block read
in {}", logFile.getPath());
- return true;
- }
- if
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
{
- // rollback older data block or delete block
- LOG.info("Rolling back an older log block read from {}
with instantTime {}", logFile.getPath(), targetInstantForCommandBlock);
- return false;
- }
- return true;
- }).collect(Collectors.toList());
-
final int numBlocksRolledBack =
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
totalRollbacks.addAndGet(numBlocksRolledBack);
LOG.info("Number of applied rollback blocks {}",
numBlocksRolledBack);
@@ -366,9 +332,6 @@ public abstract class AbstractHoodieLogRecordReader {
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the
next data block
currentInstantLogBlocks.push(logBlock);
- validLogBlockInstants.add(logBlock);
- // we don't need to update the block sequence tracker here, since
the block sequence tracker is meant to remove additional/spurious valid
logblocks.
- // anyway, contents of corrupt blocks are not read.
break;
default:
throw new UnsupportedOperationException("Block type not supported
yet");
@@ -376,23 +339,9 @@ public abstract class AbstractHoodieLogRecordReader {
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
- boolean duplicateBlocksDetected = false;
- if (blockIdentifiersPresent.get()) {
- Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo =
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants,
blockSequenceMapPerCommit);
- duplicateBlocksDetected = dedupedLogBlocksInfo.getKey();
- if (duplicateBlocksDetected) {
- // if there are duplicate log blocks that needs to be removed, we
re-create the queue for valid log blocks from dedupedLogBlocks
- currentInstantLogBlocks = new ArrayDeque<>();
- dedupedLogBlocksInfo.getValue().forEach(block ->
currentInstantLogBlocks.push(block));
- LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
- }
- if (!duplicateBlocksDetected) {
- // if there are no dups, we can take currentInstantLogBlocks as is.
- LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
+ // if there are no dups, we can take currentInstantLogBlocks as is.
+ LOG.info("Merging the final data blocks");
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
}
// Done
@@ -415,119 +364,6 @@ public abstract class AbstractHoodieLogRecordReader {
}
}
- /**
- * There could be spurious log blocks due to spark task retries. So, we will
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log
blocks and return
- * a deduped set of log blocks.
- * @param allValidLogBlocks all valid log blocks parsed so far.
- * @param blockSequenceMapPerCommit map containing block sequence numbers
for every commit.
- * @return a Pair of boolean and list of deduped valid block blocks, where
boolean of true means, there have been dups detected.
- */
- private Pair<Boolean, List<HoodieLogBlock>>
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
-
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
blockSequenceMapPerCommit) {
-
- boolean dupsFound =
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList ->
perCommitBlockList.size() > 1);
- if (dupsFound) {
- if (LOG.isDebugEnabled()) {
- logBlockSequenceMapping(blockSequenceMapPerCommit);
- }
-
- // duplicates are found. we need to remove duplicate log blocks.
- for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
entry: blockSequenceMapPerCommit.entrySet()) {
- Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences
= entry.getValue();
- if (perCommitBlockSequences.size() > 1) {
- // only those that have more than 1 sequence needs deduping.
- int maxSequenceCount = -1;
- int maxAttemptNo = -1;
- for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>>
perAttemptEntries : perCommitBlockSequences.entrySet()) {
- Long attemptNo = perAttemptEntries.getKey();
- int size = perAttemptEntries.getValue().size();
- if (maxSequenceCount <= size) {
- maxSequenceCount = size;
- maxAttemptNo = Math.toIntExact(attemptNo);
- }
- }
- // for other sequences (!= maxSequenceIndex), we need to remove the
corresponding logBlocks from allValidLogBlocks
- for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>>
perAttemptEntries : perCommitBlockSequences.entrySet()) {
- Long attemptNo = perAttemptEntries.getKey();
- if (maxAttemptNo != attemptNo) {
- List<HoodieLogBlock> logBlocksToRemove =
perCommitBlockSequences.get(attemptNo).stream().map(Pair::getValue).collect(Collectors.toList());
- logBlocksToRemove.forEach(logBlockToRemove ->
allValidLogBlocks.remove(logBlockToRemove));
- }
- }
- }
- }
- return Pair.of(true, allValidLogBlocks);
- } else {
- return Pair.of(false, allValidLogBlocks);
- }
- }
-
- private void logBlockSequenceMapping(Map<String, Map<Long,
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
- LOG.warn("Duplicate log blocks found ");
- for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
entry : blockSequenceMapPerCommit.entrySet()) {
- if (entry.getValue().size() > 1) {
- LOG.warn("\tCommit time {}", entry.getKey());
- Map<Long, List<Pair<Integer, HoodieLogBlock>>> value =
entry.getValue();
- for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq
: value.entrySet()) {
- LOG.warn("\t\tAttempt number {}", attemptsSeq.getKey());
- attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog
block sequence no : {}, log file {}",
- entryValue.getKey(),
entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString()));
- }
- }
- }
- }
-
- /**
- * Updates map tracking block seq no.
- * Here is the map structure.
- * Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
blockSequenceMapPerCommit
- * Key: Commit time.
- * Value: Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
- * Value refers to a Map of different attempts for the commit of interest.
List contains the block seq number and the resp HoodieLogBlock.
- *
- * For eg, if there were two attempts for a file slice while writing(due to
spark task retries), here is how the map might look like
- * key: commit1
- * value : {
- * 0L = List = { {0, lb1}, {1, lb2} },
- * 1L = List = { {0, lb3}, {1, lb4}, {2, lb5}}
- * }
- * Meaning: for commit1, there was two attempts with Append Handle while
writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4
and lb5 was added.
- * We keep populating this entire map and finally detect spurious log
blocks and ignore them.
- * In most cases, we might just see one set of sequence for a given commit.
- *
- * @param logBlock log block of interest to be added.
- * @param instantTime commit time of interest.
- * @param blockSeqNumber block sequence number.
- * @param blockSequenceMapPerCommit map tracking per commit block sequences.
- */
- private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String
instantTime, int blockSeqNumber, long attemptNumber,
- Map<String, Map<Long,
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit,
- AtomicBoolean
blockIdentifiersPresent) {
- if (blockSeqNumber != -1 && attemptNumber != -1) { // update the block
sequence tracker for log blocks containing the same.
- blockIdentifiersPresent.set(true);
- blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new
HashMap<>());
- Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap =
blockSequenceMapPerCommit.get(instantTime);
- if (curCommitBlockMap.containsKey(attemptNumber)) {
- // append to existing map entry
- curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber,
logBlock));
- } else {
- // create a new map entry
- curCommitBlockMap.put(attemptNumber, new ArrayList<>());
- curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber,
logBlock));
- }
- // update the latest to block sequence tracker
- blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
- } else {
- // all of older blocks are considered valid. there should be only one
list for older commits where block sequence number is not present.
- blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new
HashMap<>());
- Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap =
blockSequenceMapPerCommit.get(instantTime);
- curCommitBlockMap.computeIfAbsent(0L, entry -> new ArrayList<>());
- curCommitBlockMap.get(0L).add(Pair.of(blockSeqNumber, logBlock));
- // update the latest to block sequence tracker
- blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
- }
- }
-
private void scanInternalV2(Option<KeySpec> keySpecOption, boolean
skipProcessingBlocks) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 84a9f6c9c4e..792d28550e8 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -114,7 +114,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -711,108 +710,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
scanner.close();
}
- @Test
- public void testBasicAppendsWithBlockSeqNos() throws IOException,
URISyntaxException, InterruptedException {
- testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos) -> {
- return writeLogFiles(partitionPath, schema, genRecords, numFiles,
enableBlockSeqNos);
- });
- }
-
- @Test
- public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException,
URISyntaxException, InterruptedException {
- testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos) -> {
- Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, numFiles, enableBlockSeqNos);
- // re add the same records again
- logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos));
- return logFiles;
- });
- }
-
- @Test
- public void testAppendsWithSpruiousLogBlocksFirstAttemptPartial() throws
IOException, URISyntaxException, InterruptedException {
- testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos) -> {
- Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, numFiles, enableBlockSeqNos);
- // removing 4th log block to simulate partial failure in 1st attempt
- List<HoodieLogFile> logFileList = new ArrayList<>(logFiles);
- logFiles.remove(logFileList.get(logFileList.size() - 1));
- // re add the same records again
- logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos));
- return logFiles;
- });
- }
-
- @Test
- public void testAppendsWithSpruiousLogBlocksSecondAttemptPartial() throws
IOException, URISyntaxException, InterruptedException {
- testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos) -> {
- Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema,
genRecords, numFiles, enableBlockSeqNos);
- // re add the same records again
- Set<HoodieLogFile> logFilesSet2 = writeLogFiles(partitionPath, schema,
genRecords, numFiles, enableBlockSeqNos);
- // removing 4th log block to simular partial failure in 2nd attempt
- List<HoodieLogFile> logFileList2 = new ArrayList<>(logFilesSet2);
- logFilesSet2.remove(logFileList2.get(logFileList2.size() - 1));
- logFiles.addAll(logFilesSet2);
- return logFiles;
- });
- }
-
- private void testAppendsWithSpruiousLogBlocks(
- boolean enableOptimizedLogBlocksScan,
- Function5<Set<HoodieLogFile>, StoragePath, Schema, List<IndexedRecord>,
Integer,
- Boolean> logGenFunc)
- throws IOException, URISyntaxException, InterruptedException {
-
- Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
- SchemaTestUtil testUtil = new SchemaTestUtil();
- List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0,
400);
- Set<HoodieLogFile> logFiles = logGenFunc.apply(partitionPath, schema,
genRecords, 4, true);
-
- FileCreateUtils.createDeltaCommit(basePath, "100", storage);
-
- HoodieMergedLogRecordScanner scanner = getLogRecordScanner(logFiles,
schema, enableOptimizedLogBlocksScan);
- // even though we have duplicates records, due to block sequence
reconcile, only one set of blocks should be parsed as valid
- assertRecordsAndCloseScanner(scanner, genRecords, schema);
- }
-
- private void assertRecordsAndCloseScanner(HoodieMergedLogRecordScanner
scanner, List<IndexedRecord> genRecords, Schema schema) throws IOException {
- List<IndexedRecord> scannedRecords = new ArrayList<>();
- for (HoodieRecord record : scanner) {
- scannedRecords.add((IndexedRecord)
- ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
- }
-
- assertEquals(sort(genRecords), sort(scannedRecords),
- "Scanner records count should be the same as appended records");
- scanner.close();
- }
-
- private HoodieMergedLogRecordScanner getLogRecordScanner(Set<HoodieLogFile>
logFiles, Schema schema,
- boolean
enableOptimizedLogBlocksScan) {
-
- // scan all log blocks (across multiple log files)
- return HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(basePath)
- .withLogFilePaths(
- logFiles.stream().sorted(HoodieLogFile.getLogFileComparator())
- .map(l -> l.getPath().toString()).collect(toList()))
- .withReaderSchema(schema)
- .withLatestInstantTime("100")
- .withMaxMemorySizeInBytes(10240L)
- .withReverseReader(false)
- .withBufferSize(BUFFER_SIZE)
- .withSpillableMapBasePath(spillableBasePath)
- .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
- .withBitCaskDiskMapCompressionEnabled(true)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
- .build();
- }
-
- @FunctionalInterface
- public interface Function5<R, T1, T2, T3, T4, T5> {
-
- R apply(T1 v1, T2 v2, T3 v3, T4 v4, T5 v5) throws IOException,
InterruptedException;
- }
-
@ParameterizedTest
@MethodSource("testArguments")
public void
testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
@@ -2953,9 +2850,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
List<IndexedRecord> targetRecords = records.subList(offset, offset +
targetRecordsCount);
logFiles.add(writer.getLogFile());
- if (enableBlockSequenceNumbers) {
- header = getUpdatedHeader(header, blockSeqNo++);
- }
writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords,
header));
filesWritten++;
}
@@ -2965,13 +2859,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
return logFiles;
}
- private static Map<HeaderMetadataType, String>
getUpdatedHeader(Map<HeaderMetadataType, String> header, int
blockSequenceNumber) {
- Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
- updatedHeader.putAll(header);
- updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER,
String.valueOf(blockSequenceNumber));
- return updatedHeader;
- }
-
/**
* Utility to convert the given iterator to a List.
*/