codope commented on code in PR #9545:
URL: https://github.com/apache/hudi/pull/9545#discussion_r1306873365
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -129,6 +129,9 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private boolean useWriterSchema = false;
private Properties recordProperties = new Properties();
+ // Block Sequence number will be used to detect duplicate log blocks(by log
reader) added due to spark task retries.
+ // It should always start with 0 for a given file slice. for roll overs and
delete blocks, we increment by 1.
Review Comment:
```suggestion
// Block sequence number will be used to detect duplicate log blocks (by
log reader) added due to spark task retries.
// It should always start with 0 for a given file slice. For rollovers and
delete blocks, it is incremented by 1.
```
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -108,8 +110,6 @@ public abstract class AbstractHoodieLogRecordReader {
private final TypedProperties payloadProps;
// Log File Paths
protected final List<String> logFilePaths;
- // Read Lazily flag
- private final boolean readBlocksLazily;
Review Comment:
why is this being removed? Did we switch to lazily reading at all the places?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -381,6 +411,102 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
}
}
+ /**
+ * There could be spurious log blocks due to spark task retries. So, we will
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log
blocks and return
+ * a deduped set of log blocks.
+ * @param allValidLogBlocks all valid log blocks parsed so far.
+ * @param blockSequenceMapPerCommit map containing block sequence numbers
for every commit.
+ * @return a Pair of boolean and list of deduped valid block blocks, where
boolean of true means, there have been dups detected.
+ */
+ private Pair<Boolean, List<HoodieLogBlock>>
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
+
Map<String, List<List<Pair<Integer,HoodieLogBlock>>>>
blockSequenceMapPerCommit) {
+ boolean dupsFound =
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList ->
perCommitBlockList.size() > 1);
+ if (dupsFound) {
+ // duplicates are found. we need to remove duplicate log blocks.
+ for (Map.Entry<String, List<List<Pair<Integer,HoodieLogBlock>>>> entry:
blockSequenceMapPerCommit.entrySet()) {
+ List<List<Pair<Integer,HoodieLogBlock>>> perCommitBlockSequences =
entry.getValue();
+ if (perCommitBlockSequences.size() > 1) {
+ // only those that have more than 1 sequence needs deduping.
+ int maxSequenceIndex = 0;
+ int maxSequenceCount = -1;
+ int totalSequences = perCommitBlockSequences.size();
+ for (int i = 0; i < totalSequences; i++) {
+ if (maxSequenceCount < perCommitBlockSequences.get(i).size()) {
+ maxSequenceCount = perCommitBlockSequences.get(i).size();
+ maxSequenceIndex = i;
+ }
+ }
+ // for other sequence (!= maxSequenceIndex), we need to remove the
corresponding logBlocks from allValidLogBlocks
+ for (int i = 0;i < perCommitBlockSequences.size(); i++) {
+ if (i != maxSequenceIndex) {
+ List<HoodieLogBlock> logBlocksToRemove =
perCommitBlockSequences.get(i).stream().map(pair ->
pair.getValue()).collect(Collectors.toList());
+ logBlocksToRemove.forEach(logBlockToRemove ->
allValidLogBlocks.remove(logBlocksToRemove));
+ }
+ }
+ }
+ }
+ return Pair.of(true, allValidLogBlocks);
+ } else {
+ return Pair.of(false, allValidLogBlocks);
+ }
+ }
+
+ /**
+ * Updates map tracking block seq no.
+ * Here is the map structure.
+ * Map<String, List<List<Pair<Integer,HoodieLogBlock>>>>
blockSequenceMapPerCommit
+ * Key: Commit time.
+ * Value: List<List<Pair<Integer,HoodieLogBlock>>>
+ * Value refers to a List of different block sequences tracked.
+ *
+ * For eg, if there were two attempts for a file slice while writing(due to
spark task retries), here is how the map might look like
+ * key: commit1
+ * value : {
+ * entry1: List = { {0, lb1}, {1, lb2} },
+ * entry2: List = { {0, lb3}, {1, lb4}, {2, lb5}}
+ * }
+ * Meaning: for commit1, there was two attempts with Append Handle while
writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4
and lb5 was added.
+ * We keep populating this entire map and finally detect spurious log
blocks and ignore them.
+ * In most cases, we might just see one set of sequence for a given commit.
+ *
+ * @param logBlock log block of interest to be added.
+ * @param instantTime commit time of interest.
+ * @param blockSeqNo block sequence number.
+ * @param blockSequenceMapPerCommit map tracking per commit block sequences.
+ */
+ private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String
instantTime, int blockSeqNo,
+ Map<String,
List<List<Pair<Integer,HoodieLogBlock>>>> blockSequenceMapPerCommit) {
+ if (blockSeqNo != -1) { // update the block sequence tracker for log
blocks containing the same.
+ if (!blockSequenceMapPerCommit.containsKey(instantTime)) {
+ blockSequenceMapPerCommit.put(instantTime, new ArrayList<>());
+ }
+ List<List<Pair<Integer,HoodieLogBlock>>> curCommitBlockList =
blockSequenceMapPerCommit.get(instantTime);
+ if (blockSeqNo == 0) {
+ // start a new list
+ List<Pair<Integer,HoodieLogBlock>> blocksList = new ArrayList<>();
+ blocksList.add(Pair.of(blockSeqNo, logBlock));
+ curCommitBlockList.add(blocksList);
+ } else {
+ // append to existing list
+ curCommitBlockList.get(curCommitBlockList.size() -
1).add(Pair.of(blockSeqNo, logBlock));
+ }
+ // update the latest to block sequence tracker
+ blockSequenceMapPerCommit.put(instantTime, curCommitBlockList);
+ } else {
+ // all of older blocks are considered valid. there should be only one
list for older commits where block sequence number is not present.
+ if (!blockSequenceMapPerCommit.containsKey(instantTime)) {
+ blockSequenceMapPerCommit.put(instantTime, new ArrayList<>());
+ }
+ List<List<Pair<Integer,HoodieLogBlock>>> curCommitBlockList =
blockSequenceMapPerCommit.get(instantTime);
+ if (curCommitBlockList.isEmpty()) {
+ curCommitBlockList.add(new ArrayList<>());
+ }
+ curCommitBlockList.get(0).add(Pair.of(blockSeqNo, logBlock));
+ // update the latest to block sequence tracker
+ blockSequenceMapPerCommit.put(instantTime, curCommitBlockList);
+ }
+ }
+
private void scanInternalV2(Option<KeySpec> keySpecOption, boolean
skipProcessingBlocks) {
Review Comment:
Don't we need the logic for scanInternalV2?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -381,6 +411,102 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
}
}
+ /**
+ * There could be spurious log blocks due to spark task retries. So, we will
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log
blocks and return
+ * a deduped set of log blocks.
+ * @param allValidLogBlocks all valid log blocks parsed so far.
+ * @param blockSequenceMapPerCommit map containing block sequence numbers
for every commit.
+ * @return a Pair of boolean and list of deduped valid block blocks, where
boolean of true means, there have been dups detected.
+ */
+ private Pair<Boolean, List<HoodieLogBlock>>
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
+
Map<String, List<List<Pair<Integer,HoodieLogBlock>>>>
blockSequenceMapPerCommit) {
+ boolean dupsFound =
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList ->
perCommitBlockList.size() > 1);
+ if (dupsFound) {
+ // duplicates are found. we need to remove duplicate log blocks.
+ for (Map.Entry<String, List<List<Pair<Integer,HoodieLogBlock>>>> entry:
blockSequenceMapPerCommit.entrySet()) {
+ List<List<Pair<Integer,HoodieLogBlock>>> perCommitBlockSequences =
entry.getValue();
+ if (perCommitBlockSequences.size() > 1) {
+ // only those that have more than 1 sequence needs deduping.
+ int maxSequenceIndex = 0;
+ int maxSequenceCount = -1;
+ int totalSequences = perCommitBlockSequences.size();
+ for (int i = 0; i < totalSequences; i++) {
+ if (maxSequenceCount < perCommitBlockSequences.get(i).size()) {
+ maxSequenceCount = perCommitBlockSequences.get(i).size();
+ maxSequenceIndex = i;
+ }
+ }
+ // for other sequence (!= maxSequenceIndex), we need to remove the
corresponding logBlocks from allValidLogBlocks
+ for (int i = 0;i < perCommitBlockSequences.size(); i++) {
+ if (i != maxSequenceIndex) {
+ List<HoodieLogBlock> logBlocksToRemove =
perCommitBlockSequences.get(i).stream().map(pair ->
pair.getValue()).collect(Collectors.toList());
+ logBlocksToRemove.forEach(logBlockToRemove ->
allValidLogBlocks.remove(logBlocksToRemove));
+ }
+ }
+ }
+ }
+ return Pair.of(true, allValidLogBlocks);
+ } else {
+ return Pair.of(false, allValidLogBlocks);
+ }
+ }
+
+ /**
+ * Updates map tracking block seq no.
+ * Here is the map structure.
+ * Map<String, List<List<Pair<Integer,HoodieLogBlock>>>>
blockSequenceMapPerCommit
+ * Key: Commit time.
+ * Value: List<List<Pair<Integer,HoodieLogBlock>>>
+ * Value refers to a List of different block sequences tracked.
+ *
+ * For eg, if there were two attempts for a file slice while writing(due to
spark task retries), here is how the map might look like
+ * key: commit1
+ * value : {
+ * entry1: List = { {0, lb1}, {1, lb2} },
+ * entry2: List = { {0, lb3}, {1, lb4}, {2, lb5}}
+ * }
Review Comment:
While the comment helps, can we simplify this a bit? Maybe add another
private class encapsulating blockSequenceNumber and HoodieLogBlock. It would
make reading the code a lot better. Also, we can eliminate repeatedly
get/constainsKey by using `putIfAbsent`.
##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -687,6 +688,108 @@ public void
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
scanner.close();
}
+ @Test
+ public void testBasicAppendsWithBlockSeqNos() throws IOException,
URISyntaxException, InterruptedException {
+ testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos) -> {
+ return writeLogFiles(partitionPath, schema, genRecords, numFiles,
enableBlockSeqNos);
+ });
+ }
+
+ @Test
+ public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException,
URISyntaxException, InterruptedException {
Review Comment:
```suggestion
public void testAppendsWithSpuriousLogBlocksExactDup() throws IOException,
URISyntaxException, InterruptedException {
```
##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -687,6 +688,108 @@ public void
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
scanner.close();
}
+ @Test
+ public void testBasicAppendsWithBlockSeqNos() throws IOException,
URISyntaxException, InterruptedException {
+ testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords,
numFiles, enableBlockSeqNos) -> {
+ return writeLogFiles(partitionPath, schema, genRecords, numFiles,
enableBlockSeqNos);
+ });
+ }
+
+ @Test
+ public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException,
URISyntaxException, InterruptedException {
Review Comment:
nit: `spurious` spelling here and other places in the test.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -632,6 +635,13 @@ private HoodieLogBlock.HoodieLogBlockType
pickLogDataBlockFormat() {
}
}
+ private static Map<HeaderMetadataType, String>
getUpdatedHeader(Map<HeaderMetadataType, String> header, int
blockSequenceNumber) {
Review Comment:
just curious, why create a copy? we can simply update the map in place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]