codope commented on code in PR #9545:
URL: https://github.com/apache/hudi/pull/9545#discussion_r1306873365


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -129,6 +129,9 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   private boolean useWriterSchema = false;
 
   private Properties recordProperties = new Properties();
+  // Block Sequence number will be used to detect duplicate log blocks(by log 
reader) added due to spark task retries.
+  // It should always start with 0 for a given file slice. for roll overs and 
delete blocks, we increment by 1.

Review Comment:
   ```suggestion
     // Block sequence number will be used to detect duplicate log blocks (by 
log reader) added due to spark task retries.
     // It should always start with 0 for a given file slice. For rollovers and 
delete blocks, it is incremented by 1.
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -108,8 +110,6 @@ public abstract class AbstractHoodieLogRecordReader {
   private final TypedProperties payloadProps;
   // Log File Paths
   protected final List<String> logFilePaths;
-  // Read Lazily flag
-  private final boolean readBlocksLazily;

Review Comment:
   why is this being removed? Did we switch to lazily reading at all the places?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -381,6 +411,102 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
     }
   }
 
+  /**
+   * There could be spurious log blocks due to spark task retries. So, we will 
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log 
blocks and return
+   * a deduped set of log blocks.
+   * @param allValidLogBlocks all valid log blocks parsed so far.
+   * @param blockSequenceMapPerCommit map containing block sequence numbers 
for every commit.
+   * @return a Pair of boolean and list of deduped valid block blocks, where 
boolean of true means, there have been dups detected.
+   */
+  private Pair<Boolean, List<HoodieLogBlock>> 
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
+                                                                      
Map<String, List<List<Pair<Integer,HoodieLogBlock>>>> 
blockSequenceMapPerCommit) {
+    boolean dupsFound = 
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> 
perCommitBlockList.size() > 1);
+    if (dupsFound) {
+      // duplicates are found. we need to remove duplicate log blocks.
+      for (Map.Entry<String, List<List<Pair<Integer,HoodieLogBlock>>>> entry: 
blockSequenceMapPerCommit.entrySet()) {
+        List<List<Pair<Integer,HoodieLogBlock>>> perCommitBlockSequences = 
entry.getValue();
+        if (perCommitBlockSequences.size() > 1) {
+          // only those that have more than 1 sequence needs deduping.
+          int maxSequenceIndex = 0;
+          int maxSequenceCount = -1;
+          int totalSequences = perCommitBlockSequences.size();
+          for (int i = 0; i < totalSequences; i++) {
+            if (maxSequenceCount < perCommitBlockSequences.get(i).size()) {
+              maxSequenceCount = perCommitBlockSequences.get(i).size();
+              maxSequenceIndex = i;
+            }
+          }
+          // for other sequence (!= maxSequenceIndex), we need to remove the 
corresponding logBlocks from allValidLogBlocks
+          for (int i = 0;i < perCommitBlockSequences.size(); i++) {
+            if (i != maxSequenceIndex) {
+              List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(i).stream().map(pair -> 
pair.getValue()).collect(Collectors.toList());
+              logBlocksToRemove.forEach(logBlockToRemove -> 
allValidLogBlocks.remove(logBlocksToRemove));
+            }
+          }
+        }
+      }
+      return Pair.of(true, allValidLogBlocks);
+    } else {
+      return Pair.of(false, allValidLogBlocks);
+    }
+  }
+
+  /**
+   * Updates map tracking block seq no.
+   * Here is the map structure.
+   * Map<String, List<List<Pair<Integer,HoodieLogBlock>>>> 
blockSequenceMapPerCommit
+   * Key: Commit time.
+   * Value: List<List<Pair<Integer,HoodieLogBlock>>>
+   *   Value refers to a List of different block sequences tracked.
+   *
+   *  For eg, if there were two attempts for a file slice while writing(due to 
spark task retries), here is how the map might look like
+   *  key: commit1
+   *  value : {
+   *    entry1: List = { {0, lb1}, {1, lb2} },
+   *    entry2: List = { {0, lb3}, {1, lb4}, {2, lb5}}
+   *  }
+   *  Meaning: for commit1, there was two attempts with Append Handle while 
writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4 
and lb5 was added.
+   *  We keep populating this entire map and finally detect spurious log 
blocks and ignore them.
+   *  In most cases, we might just see one set of sequence for a given commit.
+   *
+   * @param logBlock log block of interest to be added.
+   * @param instantTime commit time of interest.
+   * @param blockSeqNo block sequence number.
+   * @param blockSequenceMapPerCommit map tracking per commit block sequences.
+   */
+  private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String 
instantTime, int blockSeqNo,
+                                          Map<String, 
List<List<Pair<Integer,HoodieLogBlock>>>> blockSequenceMapPerCommit) {
+    if (blockSeqNo != -1) { // update the block sequence tracker for log 
blocks containing the same.
+      if (!blockSequenceMapPerCommit.containsKey(instantTime)) {
+        blockSequenceMapPerCommit.put(instantTime, new ArrayList<>());
+      }
+      List<List<Pair<Integer,HoodieLogBlock>>> curCommitBlockList = 
blockSequenceMapPerCommit.get(instantTime);
+      if (blockSeqNo == 0) {
+        // start a new list
+        List<Pair<Integer,HoodieLogBlock>> blocksList = new ArrayList<>();
+        blocksList.add(Pair.of(blockSeqNo, logBlock));
+        curCommitBlockList.add(blocksList);
+      } else {
+        // append to existing list
+        curCommitBlockList.get(curCommitBlockList.size() - 
1).add(Pair.of(blockSeqNo, logBlock));
+      }
+      // update the latest to block sequence tracker
+      blockSequenceMapPerCommit.put(instantTime, curCommitBlockList);
+    } else {
+      // all of older blocks are considered valid. there should be only one 
list for older commits where block sequence number is not present.
+      if (!blockSequenceMapPerCommit.containsKey(instantTime)) {
+        blockSequenceMapPerCommit.put(instantTime, new ArrayList<>());
+      }
+      List<List<Pair<Integer,HoodieLogBlock>>> curCommitBlockList = 
blockSequenceMapPerCommit.get(instantTime);
+      if (curCommitBlockList.isEmpty()) {
+        curCommitBlockList.add(new ArrayList<>());
+      }
+      curCommitBlockList.get(0).add(Pair.of(blockSeqNo, logBlock));
+      // update the latest to block sequence tracker
+      blockSequenceMapPerCommit.put(instantTime, curCommitBlockList);
+    }
+  }
+
   private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {

Review Comment:
   Don't we need the logic for scanInternalV2?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -381,6 +411,102 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
     }
   }
 
+  /**
+   * There could be spurious log blocks due to spark task retries. So, we will 
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log 
blocks and return
+   * a deduped set of log blocks.
+   * @param allValidLogBlocks all valid log blocks parsed so far.
+   * @param blockSequenceMapPerCommit map containing block sequence numbers 
for every commit.
+   * @return a Pair of boolean and list of deduped valid block blocks, where 
boolean of true means, there have been dups detected.
+   */
+  private Pair<Boolean, List<HoodieLogBlock>> 
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
+                                                                      
Map<String, List<List<Pair<Integer,HoodieLogBlock>>>> 
blockSequenceMapPerCommit) {
+    boolean dupsFound = 
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> 
perCommitBlockList.size() > 1);
+    if (dupsFound) {
+      // duplicates are found. we need to remove duplicate log blocks.
+      for (Map.Entry<String, List<List<Pair<Integer,HoodieLogBlock>>>> entry: 
blockSequenceMapPerCommit.entrySet()) {
+        List<List<Pair<Integer,HoodieLogBlock>>> perCommitBlockSequences = 
entry.getValue();
+        if (perCommitBlockSequences.size() > 1) {
+          // only those that have more than 1 sequence needs deduping.
+          int maxSequenceIndex = 0;
+          int maxSequenceCount = -1;
+          int totalSequences = perCommitBlockSequences.size();
+          for (int i = 0; i < totalSequences; i++) {
+            if (maxSequenceCount < perCommitBlockSequences.get(i).size()) {
+              maxSequenceCount = perCommitBlockSequences.get(i).size();
+              maxSequenceIndex = i;
+            }
+          }
+          // for other sequence (!= maxSequenceIndex), we need to remove the 
corresponding logBlocks from allValidLogBlocks
+          for (int i = 0;i < perCommitBlockSequences.size(); i++) {
+            if (i != maxSequenceIndex) {
+              List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(i).stream().map(pair -> 
pair.getValue()).collect(Collectors.toList());
+              logBlocksToRemove.forEach(logBlockToRemove -> 
allValidLogBlocks.remove(logBlocksToRemove));
+            }
+          }
+        }
+      }
+      return Pair.of(true, allValidLogBlocks);
+    } else {
+      return Pair.of(false, allValidLogBlocks);
+    }
+  }
+
+  /**
+   * Updates map tracking block seq no.
+   * Here is the map structure.
+   * Map<String, List<List<Pair<Integer,HoodieLogBlock>>>> 
blockSequenceMapPerCommit
+   * Key: Commit time.
+   * Value: List<List<Pair<Integer,HoodieLogBlock>>>
+   *   Value refers to a List of different block sequences tracked.
+   *
+   *  For eg, if there were two attempts for a file slice while writing(due to 
spark task retries), here is how the map might look like
+   *  key: commit1
+   *  value : {
+   *    entry1: List = { {0, lb1}, {1, lb2} },
+   *    entry2: List = { {0, lb3}, {1, lb4}, {2, lb5}}
+   *  }

Review Comment:
   While the comment helps, can we simplify this a bit? Maybe add another 
private class encapsulating blockSequenceNumber and HoodieLogBlock. It would 
make reading the code a lot better. Also, we can eliminate repeatedly 
get/constainsKey by using `putIfAbsent`.



##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -687,6 +688,108 @@ public void 
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
     scanner.close();
   }
 
+  @Test
+  public void testBasicAppendsWithBlockSeqNos() throws IOException, 
URISyntaxException, InterruptedException {
+    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
+      return writeLogFiles(partitionPath, schema, genRecords, numFiles, 
enableBlockSeqNos);
+    });
+  }
+
+  @Test
+  public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, 
URISyntaxException, InterruptedException {

Review Comment:
   ```suggestion
     public void testAppendsWithSpuriousLogBlocksExactDup() throws IOException, 
URISyntaxException, InterruptedException {
   ```



##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -687,6 +688,108 @@ public void 
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
     scanner.close();
   }
 
+  @Test
+  public void testBasicAppendsWithBlockSeqNos() throws IOException, 
URISyntaxException, InterruptedException {
+    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
+      return writeLogFiles(partitionPath, schema, genRecords, numFiles, 
enableBlockSeqNos);
+    });
+  }
+
+  @Test
+  public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, 
URISyntaxException, InterruptedException {

Review Comment:
   nit: `spurious` spelling here and other places in the test.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -632,6 +635,13 @@ private HoodieLogBlock.HoodieLogBlockType 
pickLogDataBlockFormat() {
     }
   }
 
+  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber) {

Review Comment:
   just curious, why create a copy? we can simply update the map in place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to