This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 45ad35b960c [HUDI-7549] Reverting spurious log block deduction with 
LogRecordReader (#10922)
45ad35b960c is described below

commit 45ad35b960cc0498caa00bd4ad7cb10fb0ad91d9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon May 13 23:13:56 2024 -0700

    [HUDI-7549] Reverting spurious log block deduction with LogRecordReader 
(#10922)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  25 +--
 .../org/apache/hudi/DummyTaskContextSupplier.java  |   5 -
 .../hudi/client/FlinkTaskContextSupplier.java      |   5 -
 .../java/org/apache/hudi/io/FlinkAppendHandle.java |   4 -
 .../client/common/JavaTaskContextSupplier.java     |   6 -
 .../testutils/HoodieJavaClientTestHarness.java     |   5 -
 .../hudi/client/SparkTaskContextSupplier.java      |   5 -
 .../common/engine/LocalTaskContextSupplier.java    |   5 -
 .../hudi/common/engine/TaskContextSupplier.java    |   5 -
 .../table/log/AbstractHoodieLogRecordReader.java   | 172 +--------------------
 .../common/functional/TestHoodieLogFormat.java     | 113 --------------
 11 files changed, 8 insertions(+), 342 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3cc018010bd..fa69701af7a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -57,7 +57,6 @@ import org.apache.hudi.exception.HoodieAppendException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
@@ -130,11 +129,6 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   private boolean useWriterSchema = false;
 
   private final Properties recordProperties = new Properties();
-  // Block Sequence number will be used to detect duplicate log blocks(by log 
reader) added due to spark task retries.
-  // It should always start with 0 for a given file slice. for rolling-over 
and delete blocks, we increment by 1.
-  private int blockSequenceNumber = 0;
-  // On task failures, a given task could be retried. So, this attempt number 
will track the number of attempts.
-  private int attemptNumber = 0;
 
   /**
    * This is used by log compaction only.
@@ -146,7 +140,6 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this.useWriterSchema = true;
     this.isLogCompaction = true;
     this.header.putAll(header);
-    this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
   }
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
@@ -163,7 +156,6 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this.sizeEstimator = new DefaultSizeEstimator();
     this.statuses = new ArrayList<>();
     this.recordProperties.putAll(config.getProps());
-    this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
     this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
   }
 
@@ -470,14 +462,12 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
             : 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
         blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
shouldWriteRecordPositions,
-            getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, 
config,
-                addBlockIdentifier()), keyField));
+            getUpdatedHeader(header, config), keyField));
       }
 
       if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
         blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions, 
shouldWriteRecordPositions,
-            getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, 
config,
-            addBlockIdentifier())));
+            getUpdatedHeader(header, config)));
       }
 
       if (blocks.size() > 0) {
@@ -576,10 +566,6 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     return true;
   }
 
-  protected boolean addBlockIdentifier() {
-    return true;
-  }
-
   private void writeToBuffer(HoodieRecord<T> record) {
     if (!partitionPath.equals(record.getPartitionPath())) {
       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
partition path, record partition: "
@@ -654,12 +640,9 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     }
   }
 
-  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber, long attemptNumber,
-                                                                  
HoodieWriteConfig config, boolean addBlockIdentifier) {
+  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header,
+                                                                  
HoodieWriteConfig config) {
     Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(header);
-    if (addBlockIdentifier && 
!HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block 
sequence numbers only for data table.
-      updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, attemptNumber + 
"," + blockSequenceNumber);
-    }
     if (config.shouldWritePartialUpdates()) {
       // When enabling writing partial updates to the data blocks, the 
"IS_PARTIAL" flag is also
       // written to the block header so that the reader can differentiate 
partial updates, i.e.,
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
index d87b6147302..d2c07e35509 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java
@@ -45,9 +45,4 @@ public class DummyTaskContextSupplier extends 
TaskContextSupplier {
   public Option<String> getProperty(EngineProperty prop) {
     return null;
   }
-
-  @Override
-  public Supplier<Integer> getAttemptNumberSupplier() {
-    return null;
-  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
index 03c835c5553..aab248fc3cf 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
@@ -62,9 +62,4 @@ public class FlinkTaskContextSupplier extends 
TaskContextSupplier {
     return Option.empty();
   }
 
-  @Override
-  public Supplier<Integer> getAttemptNumberSupplier() {
-    return () -> -1;
-  }
-
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index 9f99a1624d3..0b517b5d4ae 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -92,10 +92,6 @@ public class FlinkAppendHandle<T, I, K, O>
         && hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
   }
 
-  protected boolean addBlockIdentifier() {
-    return false;
-  }
-
   @Override
   public List<WriteStatus> close() {
     try {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
index b40419a8015..628201ccc25 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java
@@ -44,10 +44,4 @@ public class JavaTaskContextSupplier extends 
TaskContextSupplier {
   public Option<String> getProperty(EngineProperty prop) {
     return Option.empty();
   }
-
-  @Override
-  public Supplier<Integer> getAttemptNumberSupplier() {
-    return () -> 0;
-  }
-
 }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 05cbd7af8e8..61429b3fef2 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -180,11 +180,6 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     public Option<String> getProperty(EngineProperty prop) {
       return Option.empty();
     }
-
-    @Override
-    public Supplier<Integer> getAttemptNumberSupplier() {
-      return () -> (int)attemptId;
-    }
   }
 
   protected void initFileSystem(String basePath, StorageConfiguration<?> 
hadoopConf) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
index 7cfa411511a..5b299d2e291 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java
@@ -50,11 +50,6 @@ public class SparkTaskContextSupplier extends 
TaskContextSupplier implements Ser
     return () -> TaskContext.get().taskAttemptId();
   }
 
-  @Override
-  public Supplier<Integer> getAttemptNumberSupplier() {
-    return () -> TaskContext.get().attemptNumber();
-  }
-
   @Override
   public Option<String> getProperty(EngineProperty prop) {
     if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
index bff42692340..6b853b566e4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
@@ -46,9 +46,4 @@ public final class LocalTaskContextSupplier extends 
TaskContextSupplier {
     return Option.empty();
   }
 
-  @Override
-  public Supplier<Integer> getAttemptNumberSupplier() {
-    return () -> 0;
-  }
-
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
index 24a6d0e527a..813236c07a8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/TaskContextSupplier.java
@@ -35,9 +35,4 @@ public abstract class TaskContextSupplier implements 
Serializable {
   public abstract Supplier<Long> getAttemptIdSupplier();
 
   public abstract Option<String> getProperty(EngineProperty prop);
-
-  /**
-   * @returns the attempt number for the task of interest. Attempt starts with 
0 and goes up by 1 on retries.
-   */
-  public abstract Supplier<Integer> getAttemptNumberSupplier();
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 6f4e00dd053..c545640e3eb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -34,7 +34,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
@@ -66,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER;
+import static 
org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
@@ -225,8 +224,6 @@ public abstract class AbstractHoodieLogRecordReader {
 
   private void scanInternalV1(Option<KeySpec> keySpecOpt) {
     currentInstantLogBlocks = new ArrayDeque<>();
-    List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
-    Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit = new HashMap<>();
     AtomicBoolean blockIdentifiersPresent = new AtomicBoolean(false);
 
     progress = 0.0f;
@@ -253,14 +250,6 @@ public abstract class AbstractHoodieLogRecordReader {
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
-        final String blockIdentifier = 
logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, 
StringUtils.EMPTY_STRING);
-        int blockSeqNumber = -1;
-        long attemptNumber = -1L;
-        if (!StringUtils.isNullOrEmpty(blockIdentifier)) {
-          String[] parts = blockIdentifier.split(",");
-          attemptNumber = Long.parseLong(parts[0]);
-          blockSeqNumber = Integer.parseInt(parts[1]);
-        }
         totalLogBlocks.incrementAndGet();
         if (logBlock.getBlockType() != CORRUPT_BLOCK
             && 
!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
 HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
@@ -281,15 +270,11 @@ public abstract class AbstractHoodieLogRecordReader {
             LOG.info("Reading a data block from file {} at instant {}", 
logFile.getPath(), instantTime);
             // store the current block
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, 
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
             break;
           case DELETE_BLOCK:
             LOG.info("Reading a delete block from file {}", logFile.getPath());
             // store deletes so can be rolled back
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, 
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
             break;
           case COMMAND_BLOCK:
             // Consider the following scenario
@@ -331,25 +316,6 @@ public abstract class AbstractHoodieLogRecordReader {
                   }
                   return false;
                 });
-
-                // remove entire entry from blockSequenceTracker
-                blockSequenceMapPerCommit.remove(targetInstantForCommandBlock);
-
-                /// remove all matching log blocks from valid list tracked so 
far
-                validLogBlockInstants = 
validLogBlockInstants.stream().filter(block -> {
-                  // handle corrupt blocks separately since they may not have 
metadata
-                  if (block.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read 
in {}", logFile.getPath());
-                    return true;
-                  }
-                  if 
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
 {
-                    // rollback older data block or delete block
-                    LOG.info("Rolling back an older log block read from {} 
with instantTime {}", logFile.getPath(), targetInstantForCommandBlock);
-                    return false;
-                  }
-                  return true;
-                }).collect(Collectors.toList());
-
                 final int numBlocksRolledBack = 
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
                 totalRollbacks.addAndGet(numBlocksRolledBack);
                 LOG.info("Number of applied rollback blocks {}", 
numBlocksRolledBack);
@@ -366,9 +332,6 @@ public abstract class AbstractHoodieLogRecordReader {
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the 
next data block
             currentInstantLogBlocks.push(logBlock);
-            validLogBlockInstants.add(logBlock);
-            // we don't need to update the block sequence tracker here, since 
the block sequence tracker is meant to remove additional/spurious valid 
logblocks.
-            // anyway, contents of corrupt blocks are not read.
             break;
           default:
             throw new UnsupportedOperationException("Block type not supported 
yet");
@@ -376,23 +339,9 @@ public abstract class AbstractHoodieLogRecordReader {
       }
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty()) {
-        boolean duplicateBlocksDetected = false;
-        if (blockIdentifiersPresent.get()) {
-          Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = 
reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, 
blockSequenceMapPerCommit);
-          duplicateBlocksDetected = dedupedLogBlocksInfo.getKey();
-          if (duplicateBlocksDetected) {
-            // if there are duplicate log blocks that needs to be removed, we 
re-create the queue for valid log blocks from dedupedLogBlocks
-            currentInstantLogBlocks = new ArrayDeque<>();
-            dedupedLogBlocksInfo.getValue().forEach(block -> 
currentInstantLogBlocks.push(block));
-            LOG.info("Merging the final data blocks");
-            processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-          }
-        }
-        if (!duplicateBlocksDetected) {
-          // if there are no dups, we can take currentInstantLogBlocks as is.
-          LOG.info("Merging the final data blocks");
-          processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-        }
+        // if there are no dups, we can take currentInstantLogBlocks as is.
+        LOG.info("Merging the final data blocks");
+        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
       }
 
       // Done
@@ -415,119 +364,6 @@ public abstract class AbstractHoodieLogRecordReader {
     }
   }
 
-  /**
-   * There could be spurious log blocks due to spark task retries. So, we will 
use BLOCK_SEQUENCE_NUMBER in the log block header to deduce such spurious log 
blocks and return
-   * a deduped set of log blocks.
-   * @param allValidLogBlocks all valid log blocks parsed so far.
-   * @param blockSequenceMapPerCommit map containing block sequence numbers 
for every commit.
-   * @return a Pair of boolean and list of deduped valid block blocks, where 
boolean of true means, there have been dups detected.
-   */
-  private Pair<Boolean, List<HoodieLogBlock>> 
reconcileSpuriousBlocksAndGetValidOnes(List<HoodieLogBlock> allValidLogBlocks,
-                                                                      
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit) {
-
-    boolean dupsFound = 
blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> 
perCommitBlockList.size() > 1);
-    if (dupsFound) {
-      if (LOG.isDebugEnabled()) {
-        logBlockSequenceMapping(blockSequenceMapPerCommit);
-      }
-
-      // duplicates are found. we need to remove duplicate log blocks.
-      for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
entry: blockSequenceMapPerCommit.entrySet()) {
-        Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences 
= entry.getValue();
-        if (perCommitBlockSequences.size() > 1) {
-          // only those that have more than 1 sequence needs deduping.
-          int maxSequenceCount = -1;
-          int maxAttemptNo = -1;
-          for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
-            Long attemptNo = perAttemptEntries.getKey();
-            int size = perAttemptEntries.getValue().size();
-            if (maxSequenceCount <= size) {
-              maxSequenceCount = size;
-              maxAttemptNo = Math.toIntExact(attemptNo);
-            }
-          }
-          // for other sequences (!= maxSequenceIndex), we need to remove the 
corresponding logBlocks from allValidLogBlocks
-          for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
-            Long attemptNo = perAttemptEntries.getKey();
-            if (maxAttemptNo != attemptNo) {
-              List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(attemptNo).stream().map(Pair::getValue).collect(Collectors.toList());
-              logBlocksToRemove.forEach(logBlockToRemove -> 
allValidLogBlocks.remove(logBlockToRemove));
-            }
-          }
-        }
-      }
-      return Pair.of(true, allValidLogBlocks);
-    } else {
-      return Pair.of(false, allValidLogBlocks);
-    }
-  }
-
-  private void logBlockSequenceMapping(Map<String, Map<Long, 
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
-    LOG.warn("Duplicate log blocks found ");
-    for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
entry : blockSequenceMapPerCommit.entrySet()) {
-      if (entry.getValue().size() > 1) {
-        LOG.warn("\tCommit time {}", entry.getKey());
-        Map<Long, List<Pair<Integer, HoodieLogBlock>>> value = 
entry.getValue();
-        for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq 
: value.entrySet()) {
-          LOG.warn("\t\tAttempt number {}", attemptsSeq.getKey());
-          attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog 
block sequence no : {}, log file {}",
-              entryValue.getKey(), 
entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString()));
-        }
-      }
-    }
-  }
-
-  /**
-   * Updates map tracking block seq no.
-   * Here is the map structure.
-   * Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
blockSequenceMapPerCommit
-   * Key: Commit time.
-   * Value: Map<Long, List<Pair<Integer, HoodieLogBlock>>>>
-   *   Value refers to a Map of different attempts for the commit of interest. 
List contains the block seq number and the resp HoodieLogBlock.
-   *
-   *  For eg, if there were two attempts for a file slice while writing(due to 
spark task retries), here is how the map might look like
-   *  key: commit1
-   *  value : {
-   *    0L = List = { {0, lb1}, {1, lb2} },
-   *    1L = List = { {0, lb3}, {1, lb4}, {2, lb5}}
-   *  }
-   *  Meaning: for commit1, there was two attempts with Append Handle while 
writing. In first attempt, lb1 and lb2 was added. And in 2nd attempt lb3, lb4 
and lb5 was added.
-   *  We keep populating this entire map and finally detect spurious log 
blocks and ignore them.
-   *  In most cases, we might just see one set of sequence for a given commit.
-   *
-   * @param logBlock log block of interest to be added.
-   * @param instantTime commit time of interest.
-   * @param blockSeqNumber block sequence number.
-   * @param blockSequenceMapPerCommit map tracking per commit block sequences.
-   */
-  private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String 
instantTime, int blockSeqNumber, long attemptNumber,
-                                          Map<String, Map<Long, 
List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit,
-                                          AtomicBoolean 
blockIdentifiersPresent) {
-    if (blockSeqNumber != -1 && attemptNumber != -1) { // update the block 
sequence tracker for log blocks containing the same.
-      blockIdentifiersPresent.set(true);
-      blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
-      Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
-      if (curCommitBlockMap.containsKey(attemptNumber)) {
-        // append to existing map entry
-        curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, 
logBlock));
-      } else {
-        // create a new map entry
-        curCommitBlockMap.put(attemptNumber, new ArrayList<>());
-        curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, 
logBlock));
-      }
-      // update the latest to block sequence tracker
-      blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
-    } else {
-      // all of older blocks are considered valid. there should be only one 
list for older commits where block sequence number is not present.
-      blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new 
HashMap<>());
-      Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = 
blockSequenceMapPerCommit.get(instantTime);
-      curCommitBlockMap.computeIfAbsent(0L, entry -> new ArrayList<>());
-      curCommitBlockMap.get(0L).add(Pair.of(blockSeqNumber, logBlock));
-      // update the latest to block sequence tracker
-      blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
-    }
-  }
-
   private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 84a9f6c9c4e..792d28550e8 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -114,7 +114,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.stream.Collectors.toList;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -711,108 +710,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     scanner.close();
   }
 
-  @Test
-  public void testBasicAppendsWithBlockSeqNos() throws IOException, 
URISyntaxException, InterruptedException {
-    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
-      return writeLogFiles(partitionPath, schema, genRecords, numFiles, 
enableBlockSeqNos);
-    });
-  }
-
-  @Test
-  public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, 
URISyntaxException, InterruptedException {
-    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
-      Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
-      // re add the same records again
-      logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos));
-      return logFiles;
-    });
-  }
-
-  @Test
-  public void testAppendsWithSpruiousLogBlocksFirstAttemptPartial() throws 
IOException, URISyntaxException, InterruptedException {
-    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
-      Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
-      // removing 4th log block to simulate partial failure in 1st attempt
-      List<HoodieLogFile> logFileList = new ArrayList<>(logFiles);
-      logFiles.remove(logFileList.get(logFileList.size() - 1));
-      // re add the same records again
-      logFiles.addAll(writeLogFiles(partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos));
-      return logFiles;
-    });
-  }
-
-  @Test
-  public void testAppendsWithSpruiousLogBlocksSecondAttemptPartial() throws 
IOException, URISyntaxException, InterruptedException {
-    testAppendsWithSpruiousLogBlocks(true, (partitionPath, schema, genRecords, 
numFiles, enableBlockSeqNos) -> {
-      Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
-      // re add the same records again
-      Set<HoodieLogFile> logFilesSet2 = writeLogFiles(partitionPath, schema, 
genRecords, numFiles, enableBlockSeqNos);
-      // removing 4th log block to simular partial failure in 2nd attempt
-      List<HoodieLogFile> logFileList2 = new ArrayList<>(logFilesSet2);
-      logFilesSet2.remove(logFileList2.get(logFileList2.size() - 1));
-      logFiles.addAll(logFilesSet2);
-      return logFiles;
-    });
-  }
-
-  private void testAppendsWithSpruiousLogBlocks(
-      boolean enableOptimizedLogBlocksScan,
-      Function5<Set<HoodieLogFile>, StoragePath, Schema, List<IndexedRecord>, 
Integer,
-          Boolean> logGenFunc)
-      throws IOException, URISyntaxException, InterruptedException {
-
-    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
-    SchemaTestUtil testUtil = new SchemaTestUtil();
-    List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
400);
-    Set<HoodieLogFile> logFiles = logGenFunc.apply(partitionPath, schema, 
genRecords, 4, true);
-
-    FileCreateUtils.createDeltaCommit(basePath, "100", storage);
-
-    HoodieMergedLogRecordScanner scanner = getLogRecordScanner(logFiles, 
schema, enableOptimizedLogBlocksScan);
-    // even though we have duplicates records, due to block sequence 
reconcile, only one set of blocks should be parsed as valid
-    assertRecordsAndCloseScanner(scanner, genRecords, schema);
-  }
-
-  private void assertRecordsAndCloseScanner(HoodieMergedLogRecordScanner 
scanner, List<IndexedRecord> genRecords, Schema schema) throws IOException {
-    List<IndexedRecord> scannedRecords = new ArrayList<>();
-    for (HoodieRecord record : scanner) {
-      scannedRecords.add((IndexedRecord)
-          ((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
-    }
-
-    assertEquals(sort(genRecords), sort(scannedRecords),
-        "Scanner records count should be the same as appended records");
-    scanner.close();
-  }
-
-  private HoodieMergedLogRecordScanner getLogRecordScanner(Set<HoodieLogFile> 
logFiles, Schema schema,
-                                                           boolean 
enableOptimizedLogBlocksScan) {
-
-    // scan all log blocks (across multiple log files)
-    return HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(storage)
-        .withBasePath(basePath)
-        .withLogFilePaths(
-            logFiles.stream().sorted(HoodieLogFile.getLogFileComparator())
-                .map(l -> l.getPath().toString()).collect(toList()))
-        .withReaderSchema(schema)
-        .withLatestInstantTime("100")
-        .withMaxMemorySizeInBytes(10240L)
-        .withReverseReader(false)
-        .withBufferSize(BUFFER_SIZE)
-        .withSpillableMapBasePath(spillableBasePath)
-        .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
-        .withBitCaskDiskMapCompressionEnabled(true)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-        .build();
-  }
-
-  @FunctionalInterface
-  public interface Function5<R, T1, T2, T3, T4, T5> {
-
-    R apply(T1 v1, T2 v2, T3 v3, T4 v4, T5 v5) throws IOException, 
InterruptedException;
-  }
-
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
@@ -2953,9 +2850,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       List<IndexedRecord> targetRecords = records.subList(offset, offset + 
targetRecordsCount);
 
       logFiles.add(writer.getLogFile());
-      if (enableBlockSequenceNumbers) {
-        header = getUpdatedHeader(header, blockSeqNo++);
-      }
       writer.appendBlock(getDataBlock(DEFAULT_DATA_BLOCK_TYPE, targetRecords, 
header));
       filesWritten++;
     }
@@ -2965,13 +2859,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     return logFiles;
   }
 
-  private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header, int 
blockSequenceNumber) {
-    Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
-    updatedHeader.putAll(header);
-    updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, 
String.valueOf(blockSequenceNumber));
-    return updatedHeader;
-  }
-
   /**
    * Utility to convert the given iterator to a List.
    */

Reply via email to