This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c357b4d18ca87ba4d7f8d533cc4d5565ef2b7620
Author: Vova Kolmakov <[email protected]>
AuthorDate: Mon Apr 15 11:31:11 2024 +0700

    [HUDI-7584] Always read log block lazily and remove readBlockLazily 
argument (#11015)
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |   3 -
 .../cli/commands/TestHoodieLogFileCommand.java     |   3 -
 .../org/apache/hudi/io/HoodieMergedReadHandle.java |   1 -
 .../hudi/table/action/compact/HoodieCompactor.java |   1 -
 .../run/strategy/JavaExecutionStrategy.java        |   1 -
 .../MultipleSparkJobExecutionStrategy.java         |   1 -
 .../hudi/common/table/TableSchemaResolver.java     |  21 ++--
 .../table/log/AbstractHoodieLogRecordReader.java   |  65 +++++------
 .../table/log/HoodieCDCLogRecordIterator.java      |   3 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |  69 +++++------
 .../hudi/common/table/log/HoodieLogFormat.java     |  13 +--
 .../common/table/log/HoodieLogFormatReader.java    |  14 +--
 .../table/log/HoodieMergedLogRecordScanner.java    |  27 ++---
 .../table/log/HoodieUnMergedLogRecordScanner.java  |  12 +-
 .../hudi/common/table/log/LogReaderUtils.java      |   2 +-
 .../metadata/HoodieMetadataLogRecordReader.java    |   1 -
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   1 -
 .../common/functional/TestHoodieLogFormat.java     | 128 +++++++--------------
 .../examples/quickstart/TestQuickstartData.java    |   1 -
 .../hudi/sink/clustering/ClusteringOperator.java   |   1 -
 .../org/apache/hudi/table/format/FormatUtils.java  |   6 -
 .../test/java/org/apache/hudi/utils/TestData.java  |   1 -
 .../realtime/HoodieMergeOnReadSnapshotReader.java  |   3 -
 .../realtime/RealtimeCompactedRecordReader.java    |   1 -
 .../realtime/RealtimeUnmergedRecordReader.java     |   1 -
 .../reader/DFSHoodieDatasetInputReader.java        |   1 -
 .../src/main/scala/org/apache/hudi/Iterators.scala |   4 -
 .../ShowHoodieLogFileRecordsProcedure.scala        |   1 -
 .../utilities/HoodieMetadataTableValidator.java    | 126 +++++++++-----------
 29 files changed, 188 insertions(+), 324 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 46a9e787ea6..77d9392fcd0 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -238,9 +238,6 @@ public class HoodieLogFileCommand {
               .withLatestInstantTime(
                   client.getActiveTimeline()
                       .getCommitTimeline().lastInstant().get().getTimestamp())
-              .withReadBlocksLazily(
-                  Boolean.parseBoolean(
-                      
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
               .withReverseReader(
                   Boolean.parseBoolean(
                       
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 6f75074ff29..dc9cdd1aaf1 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -241,9 +241,6 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
         .withLatestInstantTime(INSTANT_TIME)
         .withMaxMemorySizeInBytes(
             HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-        .withReadBlocksLazily(
-            Boolean.parseBoolean(
-                
HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue()))
         .withReverseReader(
             Boolean.parseBoolean(
                 
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index e74ab37f4b6..280e24e46b9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -126,7 +126,6 @@ public class HoodieMergedReadHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K
         .withReaderSchema(readerSchema)
         .withLatestInstantTime(instantTime)
         
.withMaxMemorySizeInBytes(IOUtils.getMaxMemoryPerCompaction(hoodieTable.getTaskContextSupplier(),
 config))
-        .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
         .withReverseReader(config.getCompactionReverseLogReadEnabled())
         .withBufferSize(config.getMaxDFSStreamBufferSize())
         .withSpillableMapBasePath(config.getSpillableMapBasePath())
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 940ab9886c3..461794a8f75 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -197,7 +197,6 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
         .withInstantRange(instantRange)
         
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
         .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
-        .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
         .withReverseReader(config.getCompactionReverseLogReadEnabled())
         .withBufferSize(config.getMaxDFSStreamBufferSize())
         .withSpillableMapBasePath(config.getSpillableMapBasePath())
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index f73238d0210..70e8de465df 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -181,7 +181,6 @@ public abstract class JavaExecutionStrategy<T>
             .withReaderSchema(readerSchema)
             .withLatestInstantTime(instantTime)
             .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
-            .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
             .withReverseReader(config.getCompactionReverseLogReadEnabled())
             .withBufferSize(config.getMaxDFSStreamBufferSize())
             .withSpillableMapBasePath(config.getSpillableMapBasePath())
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index b1fd74a6169..62a510a0b3c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -309,7 +309,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
               .withReaderSchema(readerSchema)
               .withLatestInstantTime(instantTime)
               .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
-              .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
               .withReverseReader(config.getCompactionReverseLogReadEnabled())
               .withBufferSize(config.getMaxDFSStreamBufferSize())
               .withSpillableMapBasePath(config.getSpillableMapBasePath())
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 0344331ab75..c5d55cdd2c6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -74,7 +75,6 @@ import java.util.function.Supplier;
 import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
 import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
 import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
-import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
 
 /**
  * Helper class to read schema from data files and log files and to convert it 
between different formats.
@@ -284,13 +284,12 @@ public class TableSchemaResolver {
             Iterator<String> filePaths = 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator();
             return Option.of(fetchSchemaFromFiles(filePaths));
           } else {
-            LOG.warn("Could not find any data file written for commit, "
-                + "so could not get schema for table " + 
metaClient.getBasePath());
+            LOG.warn("Could not find any data file written for commit, so 
could not get schema for table {}", metaClient.getBasePathV2());
             return Option.empty();
           }
         default:
-          LOG.error("Unknown table type " + metaClient.getTableType());
-          throw new InvalidTableException(metaClient.getBasePath());
+          LOG.error("Unknown table type {}", metaClient.getTableType());
+          throw new 
InvalidTableException(metaClient.getBasePathV2().toString());
       }
     } catch (IOException e) {
       throw new HoodieException("Failed to read data schema", e);
@@ -328,7 +327,7 @@ public class TableSchemaResolver {
   }
 
   private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) 
throws IOException {
-    LOG.info("Reading schema from " + parquetFilePath);
+    LOG.info("Reading schema from {}", parquetFilePath);
 
     FileSystem fs = metaClient.getRawFs();
     ParquetMetadata fileFooter =
@@ -337,18 +336,18 @@ public class TableSchemaResolver {
   }
 
   private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws 
IOException {
-    LOG.info("Reading schema from " + hFilePath);
+    LOG.info("Reading schema from {}", hFilePath);
 
     FileSystem fs = metaClient.getRawFs();
     try (HoodieFileReader fileReader =
              
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-                 .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, fs.getConf(), 
hFilePath)) {
+                 .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, 
fs.getConf(), hFilePath)) {
       return convertAvroSchemaToParquet(fileReader.getSchema());
     }
   }
 
   private MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws 
IOException {
-    LOG.info("Reading schema from " + orcFilePath);
+    LOG.info("Reading schema from {}", orcFilePath);
 
     FileSystem fs = metaClient.getRawFs();
     HoodieAvroOrcReader orcReader = new HoodieAvroOrcReader(fs.getConf(), 
orcFilePath);
@@ -388,7 +387,7 @@ public class TableSchemaResolver {
     // We only need to read the schema from the log block header,
     // so we read the block lazily to avoid reading block content
     // containing the records
-    try (Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(path), null, true, false)) {
+    try (Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(path), null, false)) {
       HoodieDataBlock lastBlock = null;
       while (reader.hasNext()) {
         HoodieLogBlock block = reader.next();
@@ -473,7 +472,7 @@ public class TableSchemaResolver {
       Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
       return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) 
!= null;
     } catch (Exception e) {
-      LOG.info(String.format("Failed to read operation field from avro schema 
(%s)", e.getMessage()));
+      LOG.info("Failed to read operation field from avro schema ({})", 
e.getMessage());
       return false;
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 6ce80da6d4a..affde833721 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -66,7 +66,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
@@ -150,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader {
   private final boolean enableOptimizedLogBlocksScan;
 
   protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, 
List<String> logFilePaths,
-                                          Schema readerSchema, String 
latestInstantTime, boolean readBlocksLazily,
+                                          Schema readerSchema, String 
latestInstantTime,
                                           boolean reverseReader, int 
bufferSize, Option<InstantRange> instantRange,
                                           boolean withOperationField, boolean 
forceFullScan,
                                           Option<String> partitionNameOverride,
@@ -243,12 +242,12 @@ public abstract class AbstractHoodieLogRecordReader {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
-          readerSchema, true, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
+          readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.info("Scanning log file " + logFile);
+        LOG.info("Scanning log file {}", logFile);
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
@@ -284,14 +283,14 @@ public abstract class AbstractHoodieLogRecordReader {
           case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
           case PARQUET_DATA_BLOCK:
-            LOG.info("Reading a data block from file " + logFile.getPath() + " 
at instant " + instantTime);
+            LOG.info("Reading a data block from file {} at instant {}", 
logFile.getPath(), instantTime);
             // store the current block
             currentInstantLogBlocks.push(logBlock);
             validLogBlockInstants.add(logBlock);
             updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, 
attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
             break;
           case DELETE_BLOCK:
-            LOG.info("Reading a delete block from file " + logFile.getPath());
+            LOG.info("Reading a delete block from file {}", logFile.getPath());
             // store deletes so can be rolled back
             currentInstantLogBlocks.push(logBlock);
             validLogBlockInstants.add(logBlock);
@@ -314,8 +313,7 @@ public abstract class AbstractHoodieLogRecordReader {
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
             String targetInstantForCommandBlock =
                 
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
-            LOG.info(String.format("Reading a command block %s with 
targetInstantTime %s from file %s", commandBlock.getType(), 
targetInstantForCommandBlock,
-                logFile.getPath()));
+            LOG.info("Reading a command block {} with targetInstantTime {} 
from file {}", commandBlock.getType(), targetInstantForCommandBlock, 
logFile.getPath());
             switch (commandBlock.getType()) { // there can be different types 
of command blocks
               case ROLLBACK_BLOCK:
                 // Rollback older read log block(s)
@@ -328,13 +326,12 @@ public abstract class AbstractHoodieLogRecordReader {
                 currentInstantLogBlocks.removeIf(block -> {
                   // handle corrupt blocks separately since they may not have 
metadata
                   if (block.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read 
in " + logFile.getPath());
+                    LOG.info("Rolling back the last corrupted log block read 
in {}", logFile.getPath());
                     return true;
                   }
                   if 
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
 {
                     // rollback older data block or delete block
-                    LOG.info(String.format("Rolling back an older log block 
read from %s with instantTime %s",
-                        logFile.getPath(), targetInstantForCommandBlock));
+                    LOG.info("Rolling back an older log block read from {} 
with instantTime {}", logFile.getPath(), targetInstantForCommandBlock);
                     return true;
                   }
                   return false;
@@ -347,13 +344,12 @@ public abstract class AbstractHoodieLogRecordReader {
                 validLogBlockInstants = 
validLogBlockInstants.stream().filter(block -> {
                   // handle corrupt blocks separately since they may not have 
metadata
                   if (block.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read 
in " + logFile.getPath());
+                    LOG.info("Rolling back the last corrupted log block read 
in {}", logFile.getPath());
                     return true;
                   }
                   if 
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
 {
                     // rollback older data block or delete block
-                    LOG.info(String.format("Rolling back an older log block 
read from %s with instantTime %s",
-                        logFile.getPath(), targetInstantForCommandBlock));
+                    LOG.info("Rolling back an older log block read from {} 
with instantTime {}", logFile.getPath(), targetInstantForCommandBlock);
                     return false;
                   }
                   return true;
@@ -361,10 +357,9 @@ public abstract class AbstractHoodieLogRecordReader {
 
                 final int numBlocksRolledBack = 
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
                 totalRollbacks.addAndGet(numBlocksRolledBack);
-                LOG.info("Number of applied rollback blocks " + 
numBlocksRolledBack);
+                LOG.info("Number of applied rollback blocks {}", 
numBlocksRolledBack);
                 if (numBlocksRolledBack == 0) {
-                  LOG.warn(String.format("TargetInstantTime %s invalid or 
extra rollback command block in %s",
-                      targetInstantForCommandBlock, logFile.getPath()));
+                  LOG.warn("TargetInstantTime {} invalid or extra rollback 
command block in {}", targetInstantForCommandBlock, logFile.getPath());
                 }
                 break;
               default:
@@ -372,7 +367,7 @@ public abstract class AbstractHoodieLogRecordReader {
             }
             break;
           case CORRUPT_BLOCK:
-            LOG.info("Found a corrupt block in " + logFile.getPath());
+            LOG.info("Found a corrupt block in {}", logFile.getPath());
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the 
next data block
             currentInstantLogBlocks.push(logBlock);
@@ -460,10 +455,8 @@ public abstract class AbstractHoodieLogRecordReader {
           for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> 
perAttemptEntries : perCommitBlockSequences.entrySet()) {
             Long attemptNo = perAttemptEntries.getKey();
             if (maxAttemptNo != attemptNo) {
-              List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(attemptNo).stream().map(pair -> 
pair.getValue()).collect(Collectors.toList());
-              logBlocksToRemove.forEach(logBlockToRemove -> {
-                allValidLogBlocks.remove(logBlockToRemove);
-              });
+              List<HoodieLogBlock> logBlocksToRemove = 
perCommitBlockSequences.get(attemptNo).stream().map(Pair::getValue).collect(Collectors.toList());
+              logBlocksToRemove.forEach(logBlockToRemove -> 
allValidLogBlocks.remove(logBlockToRemove));
             }
           }
         }
@@ -478,12 +471,12 @@ public abstract class AbstractHoodieLogRecordReader {
     LOG.warn("Duplicate log blocks found ");
     for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> 
entry : blockSequenceMapPerCommit.entrySet()) {
       if (entry.getValue().size() > 1) {
-        LOG.warn("\tCommit time " + entry.getKey());
+        LOG.warn("\tCommit time {}", entry.getKey());
         Map<Long, List<Pair<Integer, HoodieLogBlock>>> value = 
entry.getValue();
         for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq 
: value.entrySet()) {
-          LOG.warn("\t\tAttempt number " + attemptsSeq.getKey());
-          attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog 
block sequence no : " + entryValue.getKey() + ", log file "
-              + 
entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString()));
+          LOG.warn("\t\tAttempt number {}", attemptsSeq.getKey());
+          attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog 
block sequence no : {}, log file {}",
+              entryValue.getKey(), 
entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString()));
         }
       }
     }
@@ -556,7 +549,7 @@ public abstract class AbstractHoodieLogRecordReader {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
CachingPath(logFile))).collect(Collectors.toList()),
-          readerSchema, true, reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
+          readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
 
       /**
        * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
@@ -603,7 +596,7 @@ public abstract class AbstractHoodieLogRecordReader {
        */
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.info("Scanning log file " + logFile);
+        LOG.info("Scanning log file {}", logFile);
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
@@ -612,7 +605,7 @@ public abstract class AbstractHoodieLogRecordReader {
         totalLogBlocks.incrementAndGet();
         // Ignore the corrupt blocks. No further handling is required for them.
         if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
-          LOG.info("Found a corrupt block in " + logFile.getPath());
+          LOG.info("Found a corrupt block in {}", logFile.getPath());
           totalCorruptBlocks.incrementAndGet();
           continue;
         }
@@ -647,12 +640,12 @@ public abstract class AbstractHoodieLogRecordReader {
             instantToBlocksMap.put(instantTime, logBlocksList);
             break;
           case COMMAND_BLOCK:
-            LOG.info("Reading a command block from file " + logFile.getPath());
+            LOG.info("Reading a command block from file {}", 
logFile.getPath());
             // This is a command block - take appropriate action based on the 
command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
 
             // Rollback blocks contain information of instants that are 
failed, collect them in a set..
-            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+            if 
(commandBlock.getType().equals(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK))
 {
               totalRollbacks.incrementAndGet();
               String targetInstantForCommandBlock =
                   logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
@@ -669,7 +662,7 @@ public abstract class AbstractHoodieLogRecordReader {
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+        LOG.debug("Ordered instant times seen {}", orderedInstantsList);
       }
 
       int numBlocksRolledBack = 0;
@@ -725,10 +718,10 @@ public abstract class AbstractHoodieLogRecordReader {
           validBlockInstants.add(compactedFinalInstantTime);
         }
       }
-      LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
+      LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
 
       if (LOG.isDebugEnabled()) {
-        LOG.info("Final view of the Block time to compactionBlockMap " + 
blockTimeToCompactionBlockTimeMap);
+        LOG.info("Final view of the Block time to compactionBlockMap {}", 
blockTimeToCompactionBlockTimeMap);
       }
 
       // merge the last read block when all the blocks are done reading
@@ -816,7 +809,7 @@ public abstract class AbstractHoodieLogRecordReader {
   private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, 
int numLogFilesSeen,
                                              Option<KeySpec> keySpecOpt) 
throws Exception {
     while (!logBlocks.isEmpty()) {
-      LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
+      LOG.info("Number of remaining logblocks to merge {}", logBlocks.size());
       // poll the element at the bottom of the stack since that's the order it 
was inserted
       HoodieLogBlock lastBlock = logBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
@@ -1022,8 +1015,6 @@ public abstract class AbstractHoodieLogRecordReader {
 
     public abstract Builder withLatestInstantTime(String latestInstantTime);
 
-    public abstract Builder withReadBlocksLazily(boolean readBlocksLazily);
-
     public abstract Builder withReverseReader(boolean reverseReader);
 
     public abstract Builder withBufferSize(int bufferSize);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
index b2464345a1d..e5938bdefb0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
@@ -82,8 +82,7 @@ public class HoodieCDCLogRecordIterator implements 
ClosableIterator<IndexedRecor
     try {
       closeReader();
       if (cdcLogFileIter.hasNext()) {
-        reader = new HoodieLogFileReader(fs, cdcLogFileIter.next(), cdcSchema,
-            HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
+        reader = new HoodieLogFileReader(fs, cdcLogFileIter.next(), cdcSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE);
         return reader.hasNext();
       }
       return false;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index c7289106f48..c1daf5e32d1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -64,7 +65,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import static 
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
@@ -77,6 +77,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
   private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogFileReader.class);
+  private static final String REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED = 
"Reverse log reader has not been enabled";
 
   private final FileSystem fs;
   private final Configuration hadoopConf;
@@ -86,7 +87,6 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private final Schema readerSchema;
   private final InternalSchema internalSchema;
   private final String keyField;
-  private final boolean readBlockLazily;
   private long reverseLogFilePosition;
   private long lastReverseLogFilePosition;
   private final boolean reverseReader;
@@ -94,26 +94,22 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private boolean closed = false;
   private SeekableDataInputStream inputStream;
 
-  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
-                             boolean readBlockLazily) throws IOException {
-    this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false);
+  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize) throws IOException {
+    this(fs, logFile, readerSchema, bufferSize, false);
   }
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
-                             boolean readBlockLazily, boolean reverseReader) 
throws IOException {
-    this(fs, logFile, readerSchema, bufferSize, readBlockLazily, 
reverseReader, false,
-        HoodieRecord.RECORD_KEY_METADATA_FIELD);
+                             boolean reverseReader) throws IOException {
+    this(fs, logFile, readerSchema, bufferSize, reverseReader, false, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
-  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
-                             boolean readBlockLazily, boolean reverseReader, 
boolean enableRecordLookups,
-                             String keyField) throws IOException {
-    this(fs, logFile, readerSchema, bufferSize, readBlockLazily, 
reverseReader, enableRecordLookups, keyField, 
InternalSchema.getEmptyInternalSchema());
+  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize, boolean reverseReader,
+                             boolean enableRecordLookups, String keyField) 
throws IOException {
+    this(fs, logFile, readerSchema, bufferSize, reverseReader, 
enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
   }
 
-  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
-                             boolean readBlockLazily, boolean reverseReader, 
boolean enableRecordLookups,
-                             String keyField, InternalSchema internalSchema) 
throws IOException {
+  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize, boolean reverseReader,
+                             boolean enableRecordLookups, String keyField, 
InternalSchema internalSchema) throws IOException {
     this.fs = fs;
     this.hadoopConf = fs.getConf();
     // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
@@ -124,7 +120,6 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     this.bufferSize = bufferSize;
     this.inputStream = getDataInputStream(fs, this.logFile, bufferSize);
     this.readerSchema = readerSchema;
-    this.readBlockLazily = readBlockLazily;
     this.reverseReader = reverseReader;
     this.enableRecordLookups = enableRecordLookups;
     this.keyField = keyField;
@@ -180,7 +175,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
 
     // 6. Read the content or skip content based on IO vs Memory trade-off by 
client
     long contentPosition = inputStream.getPos();
-    boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+    boolean shouldReadLazily = nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION;
     Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, 
contentLength, shouldReadLazily);
 
     // 7. Read footer if any
@@ -204,7 +199,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
           return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, 
internalSchema);
         } else {
-          return new HoodieAvroDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
+          return new HoodieAvroDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, true, logBlockContentLoc,
               getTargetReaderSchemaForBlock(), header, footer, keyField);
         }
 
@@ -212,25 +207,25 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
             String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
         return new HoodieHFileDataBlock(
-            () -> getDataInputStream(fs, this.logFile, bufferSize), content, 
readBlockLazily, logBlockContentLoc,
+            () -> getDataInputStream(fs, this.logFile, bufferSize), content, 
true, logBlockContentLoc,
             Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath(),
-            ConfigUtils.getBooleanWithAltKeys(fs.getConf(), 
USE_NATIVE_HFILE_READER));
+            ConfigUtils.getBooleanWithAltKeys(fs.getConf(), 
HoodieReaderConfig.USE_NATIVE_HFILE_READER));
 
       case PARQUET_DATA_BLOCK:
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
             String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
 
-        return new HoodieParquetDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
+        return new HoodieParquetDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, true, logBlockContentLoc,
             getTargetReaderSchemaForBlock(), header, footer, keyField);
 
       case DELETE_BLOCK:
-        return new HoodieDeleteBlock(content, () -> getDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), 
header, footer);
+        return new HoodieDeleteBlock(content, () -> getDataInputStream(fs, 
this.logFile, bufferSize), true, Option.of(logBlockContentLoc), header, footer);
 
       case COMMAND_BLOCK:
-        return new HoodieCommandBlock(content, () -> getDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), 
header, footer);
+        return new HoodieCommandBlock(content, () -> getDataInputStream(fs, 
this.logFile, bufferSize), true, Option.of(logBlockContentLoc), header, footer);
 
       case CDC_DATA_BLOCK:
-        return new HoodieCDCDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, 
readerSchema, header, keyField);
+        return new HoodieCDCDataBlock(() -> getDataInputStream(fs, 
this.logFile, bufferSize), content, true, logBlockContentLoc, readerSchema, 
header, keyField);
 
       default:
         throw new HoodieNotSupportedException("Unsupported Block " + 
blockType);
@@ -261,18 +256,18 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   }
 
   private HoodieLogBlock createCorruptBlock(long blockStartPos) throws 
IOException {
-    LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
+    LOG.info("Log {} has a corrupted block at {}", logFile, blockStartPos);
     inputStream.seek(blockStartPos);
     long nextBlockOffset = scanForNextAvailableBlockOffset();
     // Rewind to the initial start and read corrupted bytes till the 
nextBlockOffset
     inputStream.seek(blockStartPos);
-    LOG.info("Next available block in " + logFile + " starts at " + 
nextBlockOffset);
+    LOG.info("Next available block in {} starts at {}", logFile, 
nextBlockOffset);
     int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
     long contentPosition = inputStream.getPos();
-    Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, 
corruptedBlockSize, readBlockLazily);
+    Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, 
corruptedBlockSize, true);
     HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
         new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, 
contentPosition, corruptedBlockSize, nextBlockOffset);
-    return new HoodieCorruptBlock(corruptedBytes, () -> getDataInputStream(fs, 
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), new 
HashMap<>(), new HashMap<>());
+    return new HoodieCorruptBlock(corruptedBytes, () -> getDataInputStream(fs, 
this.logFile, bufferSize), true, Option.of(logBlockContentLoc), new 
HashMap<>(), new HashMap<>());
   }
 
   private boolean isBlockCorrupted(int blocksize) throws IOException {
@@ -293,7 +288,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
       // So we have to shorten the footer block size by the size of magic hash
       blockSizeFromFooter = inputStream.readLong() - magicBuffer.length;
     } catch (EOFException e) {
-      LOG.info("Found corrupted block in file " + logFile + " with block 
size(" + blocksize + ") running past EOF");
+      LOG.info("Found corrupted block in file {} with block size({}) running 
past EOF", logFile, blocksize);
       // this is corrupt
       // This seek is required because contract of seek() is different for 
naked DFSInputStream vs BufferedFSInputStream
       // release-3.1.0-RC1/DFSInputStream.java#L1455
@@ -303,8 +298,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     }
 
     if (blocksize != blockSizeFromFooter) {
-      LOG.info("Found corrupted block in file " + logFile + ". Header block 
size(" + blocksize
-          + ") did not match the footer block size(" + blockSizeFromFooter + 
")");
+      LOG.info("Found corrupted block in file {}. Header block size({}) did 
not match the footer block size({})", logFile, blocksize, blockSizeFromFooter);
       inputStream.seek(currentPos);
       return true;
     }
@@ -315,7 +309,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
       return false;
     } catch (CorruptedLogFileException e) {
       // This is a corrupted block
-      LOG.info("Found corrupted block in file " + logFile + ". No magic hash 
found right after footer block size entry");
+      LOG.info("Found corrupted block in file {}. No magic hash found right 
after footer block size entry", logFile);
       return true;
     } finally {
       inputStream.seek(currentPos);
@@ -348,7 +342,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   @Override
   public void close() throws IOException {
     if (!closed) {
-      LOG.info("Closing Log file reader " +  logFile.getFileName());
+      LOG.info("Closing Log file reader {}",  logFile.getFileName());
       if (null != this.inputStream) {
         this.inputStream.close();
       }
@@ -411,7 +405,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   public boolean hasPrev() {
     try {
       if (!this.reverseReader) {
-        throw new HoodieNotSupportedException("Reverse log reader has not been 
enabled");
+        throw new 
HoodieNotSupportedException(REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED);
       }
       reverseLogFilePosition = lastReverseLogFilePosition;
       reverseLogFilePosition -= Long.BYTES;
@@ -433,7 +427,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   public HoodieLogBlock prev() throws IOException {
 
     if (!this.reverseReader) {
-      throw new HoodieNotSupportedException("Reverse log reader has not been 
enabled");
+      throw new 
HoodieNotSupportedException(REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED);
     }
     long blockSize = inputStream.readLong();
     long blockEndPos = inputStream.getPos();
@@ -443,8 +437,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     } catch (Exception e) {
       // this could be a corrupt block
       inputStream.seek(blockEndPos);
-      throw new CorruptedLogFileException("Found possible corrupted block, 
cannot read log file in reverse, "
-          + "fallback to forward reading of logfile");
+      throw new CorruptedLogFileException("Found possible corrupted block, 
cannot read log file in reverse, fallback to forward reading of logfile");
     }
     boolean hasNext = hasNext();
     reverseLogFilePosition -= blockSize;
@@ -460,7 +453,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   public long moveToPrev() throws IOException {
 
     if (!this.reverseReader) {
-      throw new HoodieNotSupportedException("Reverse log reader has not been 
enabled");
+      throw new 
HoodieNotSupportedException(REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED);
     }
     inputStream.seek(lastReverseLogFilePosition);
     long blockSize = inputStream.readLong();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index 5e7d0806fae..12a80c07a91 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -258,8 +258,7 @@ public interface HoodieLogFormat {
           // Use rollover write token as write token to create new log file 
with tokens
           logWriteToken = rolloverLogWriteToken;
         }
-        LOG.info("Computed the next log version for " + logFileId + " in " + 
parentPath + " as " + logVersion
-            + " with write-token " + logWriteToken);
+        LOG.info("Computed the next log version for {} in {} as {} with 
write-token {}", logFileId, parentPath, logVersion, logWriteToken);
       }
 
       if (logWriteToken == null) {
@@ -279,7 +278,7 @@ public interface HoodieLogFormat {
 
       Path logPath = new Path(parentPath,
           FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, 
logVersion, logWriteToken));
-      LOG.info("HoodieLogFile on path " + logPath);
+      LOG.info("HoodieLogFile on path {}", logPath);
       HoodieLogFile logFile = new HoodieLogFile(logPath, fileLen);
 
       if (bufferSize == null) {
@@ -302,13 +301,11 @@ public interface HoodieLogFormat {
 
   static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSchema)
       throws IOException {
-    return new HoodieLogFileReader(fs, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
+    return new HoodieLogFileReader(fs, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE);
   }
 
-  static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSchema,
-      boolean readBlockLazily, boolean reverseReader) throws IOException {
-    return new HoodieLogFileReader(fs, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, readBlockLazily,
-        reverseReader);
+  static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSchema, boolean reverseReader) throws IOException {
+    return new HoodieLogFileReader(fs, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, reverseReader);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 3c4737af8d0..f21091e5df0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -41,27 +41,25 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
   private final FileSystem fs;
   private final Schema readerSchema;
   private final InternalSchema internalSchema;
-  private final boolean readBlocksLazily;
   private final String recordKeyField;
   private final boolean enableInlineReading;
   private final int bufferSize;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogFormatReader.class);
 
-  HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema 
readerSchema, boolean readBlocksLazily,
+  HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema 
readerSchema,
                         boolean reverseLogReader, int bufferSize, boolean 
enableRecordLookups,
                         String recordKeyField, InternalSchema internalSchema) 
throws IOException {
     this.logFiles = logFiles;
     this.fs = fs;
     this.readerSchema = readerSchema;
-    this.readBlocksLazily = readBlocksLazily;
     this.bufferSize = bufferSize;
     this.recordKeyField = recordKeyField;
     this.enableInlineReading = enableRecordLookups;
     this.internalSchema = internalSchema == null ? 
InternalSchema.getEmptyInternalSchema() : internalSchema;
-    if (logFiles.size() > 0) {
+    if (!logFiles.isEmpty()) {
       HoodieLogFile nextLogFile = logFiles.remove(0);
-      this.currentReader = new HoodieLogFileReader(fs, nextLogFile, 
readerSchema, bufferSize, readBlocksLazily, false,
+      this.currentReader = new HoodieLogFileReader(fs, nextLogFile, 
readerSchema, bufferSize, false,
           enableRecordLookups, recordKeyField, internalSchema);
     }
   }
@@ -83,16 +81,16 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
       return false;
     } else if (currentReader.hasNext()) {
       return true;
-    } else if (logFiles.size() > 0) {
+    } else if (!logFiles.isEmpty()) {
       try {
         HoodieLogFile nextLogFile = logFiles.remove(0);
         this.currentReader.close();
-        this.currentReader = new HoodieLogFileReader(fs, nextLogFile, 
readerSchema, bufferSize, readBlocksLazily, false,
+        this.currentReader = new HoodieLogFileReader(fs, nextLogFile, 
readerSchema, bufferSize, false,
             enableInlineReading, recordKeyField, internalSchema);
       } catch (IOException io) {
         throw new HoodieIOException("unable to initialize read with log file 
", io);
       }
-      LOG.info("Moving to the next reader for logfile " + 
currentReader.getLogFile());
+      LOG.info("Moving to the next reader for logfile {}", 
currentReader.getLogFile());
       return hasNext();
     }
     return false;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 9062641f1a7..c3cf2f97ab8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -92,7 +92,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
 
   @SuppressWarnings("unchecked")
   private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
-                                       String latestInstantTime, Long 
maxMemorySizeInBytes, boolean readBlocksLazily,
+                                       String latestInstantTime, Long 
maxMemorySizeInBytes,
                                        boolean reverseReader, int bufferSize, 
String spillableMapBasePath,
                                        Option<InstantRange> instantRange,
                                        ExternalSpillableMap.DiskMapType 
diskMapType,
@@ -103,7 +103,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
                                        Option<String> keyFieldOverride,
                                        boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
                                       Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize,
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize,
         instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
         hoodieTableMetaClientOption);
     try {
@@ -206,12 +206,14 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
     this.numMergedRecordsInLog = records.size();
 
-    LOG.info("Number of log files scanned => " + logFilePaths.size());
-    LOG.info("MaxMemoryInBytes allowed for compaction => " + 
maxMemorySizeInBytes);
-    LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " 
+ records.getInMemoryMapNumEntries());
-    LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => 
" + records.getCurrentInMemoryMapSize());
-    LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + 
records.getDiskBasedMapNumEntries());
-    LOG.info("Size of file spilled to disk => " + 
records.getSizeOfFileOnDiskInBytes());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Number of log files scanned => {}", logFilePaths.size());
+      LOG.info("MaxMemoryInBytes allowed for compaction => {}", 
maxMemorySizeInBytes);
+      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => 
{}", records.getInMemoryMapNumEntries());
+      LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap 
=> {}", records.getCurrentInMemoryMapSize());
+      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => 
{}", records.getDiskBasedMapNumEntries());
+      LOG.info("Size of file spilled to disk => {}", 
records.getSizeOfFileOnDiskInBytes());
+    }
   }
 
   @Override
@@ -321,7 +323,6 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     private Schema readerSchema;
     private InternalSchema internalSchema = 
InternalSchema.getEmptyInternalSchema();
     private String latestInstantTime;
-    private boolean readBlocksLazily;
     private boolean reverseReader;
     private int bufferSize;
     // specific configurations
@@ -373,12 +374,6 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
       return this;
     }
 
-    @Override
-    public Builder withReadBlocksLazily(boolean readBlocksLazily) {
-      this.readBlocksLazily = readBlocksLazily;
-      return this;
-    }
-
     @Override
     public Builder withReverseReader(boolean reverseReader) {
       this.reverseReader = reverseReader;
@@ -470,7 +465,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
       ValidationUtils.checkArgument(recordMerger != null);
 
       return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, 
readerSchema,
-          latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, 
reverseReader,
+          latestInstantTime, maxMemorySizeInBytes, reverseReader,
           bufferSize, spillableMapBasePath, instantRange,
           diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, 
forceFullScan,
           Option.ofNullable(partitionName), internalSchema, 
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 4d870618e7b..492d6299a0d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -43,11 +43,11 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
   private final LogRecordScannerCallback callback;
 
   private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
-                                         String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader, int bufferSize,
+                                         String latestInstantTime, boolean 
reverseReader, int bufferSize,
                                          LogRecordScannerCallback callback, 
Option<InstantRange> instantRange, InternalSchema internalSchema,
                                          boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
                                          Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange,
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize, instantRange,
         false, true, Option.empty(), internalSchema, Option.empty(), 
enableOptimizedLogBlocksScan, recordMerger,
          hoodieTableMetaClientOption);
     this.callback = callback;
@@ -104,7 +104,6 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
     private Schema readerSchema;
     private InternalSchema internalSchema;
     private String latestInstantTime;
-    private boolean readBlocksLazily;
     private boolean reverseReader;
     private int bufferSize;
     private Option<InstantRange> instantRange = Option.empty();
@@ -147,11 +146,6 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
       return this;
     }
 
-    public Builder withReadBlocksLazily(boolean readBlocksLazily) {
-      this.readBlocksLazily = readBlocksLazily;
-      return this;
-    }
-
     public Builder withReverseReader(boolean reverseReader) {
       this.reverseReader = reverseReader;
       return this;
@@ -196,7 +190,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
       ValidationUtils.checkArgument(recordMerger != null);
 
       return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, 
readerSchema,
-          latestInstantTime, readBlocksLazily, reverseReader, bufferSize, 
callback, instantRange,
+          latestInstantTime, reverseReader, bufferSize, callback, instantRange,
           internalSchema, enableOptimizedLogBlocksScan, recordMerger, 
Option.ofNullable(hoodieTableMetaClient));
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index 768085c322c..93383df332f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -52,7 +52,7 @@ public class LogReaderUtils {
   private static Schema readSchemaFromLogFileInReverse(FileSystem fs, 
HoodieActiveTimeline activeTimeline, HoodieLogFile hoodieLogFile)
       throws IOException {
     // set length for the HoodieLogFile as it will be leveraged by 
HoodieLogFormat.Reader with reverseReading enabled
-    Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true, 
true);
+    Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true);
     Schema writerSchema = null;
     HoodieTimeline completedTimeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
     while (reader.hasPrev()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 900260b9413..3cd0a9b0da1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -139,7 +139,6 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
             // NOTE: Merging of Metadata Table's records is currently handled 
using {@code HoodiePreCombineAvroRecordMerger}
             //       for compatibility purposes; In the future it {@code 
HoodieMetadataPayload} semantic
             //       will be migrated to its own custom instance of {@code 
RecordMerger}
-            .withReadBlocksLazily(true)
             .withReverseReader(false)
             .withOperationField(false);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 480ae76a5a1..b25d6741b83 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1801,7 +1801,6 @@ public class HoodieTableMetadataUtil {
             .withLogFilePaths(logFilePaths)
             .withReaderSchema(HoodieAvroUtils.getRecordKeySchema())
             
.withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(""))
-            .withReadBlocksLazily(configuration.get().getBoolean("", true))
             .withReverseReader(false)
             
.withMaxMemorySizeInBytes(configuration.get().getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(),
 DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
             
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index d4cb5021afc..9e7314cf245 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -440,8 +440,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     }
     writer.close();
 
-    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema(),
-        true, true);
+    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema(), true);
     assertTrue(reader.hasNext(), "We wrote a block, we should be able to read 
it");
     HoodieLogBlock nextBlock = reader.next();
     assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next 
block should be a data block");
@@ -635,7 +634,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                   boolean isCompressionEnabled,
-                                                  boolean readBlocksLazily,
                                                   boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
 
@@ -657,7 +655,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("100")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -763,7 +760,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("100")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(true)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -783,7 +779,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
                                                 boolean isCompressionEnabled,
-                                                boolean readBlocksLazily,
                                                 boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     // Generate 3 delta-log files w/ random records
@@ -805,7 +800,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("100")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -873,7 +867,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                              boolean 
isCompressionEnabled,
-                                                             boolean 
readBlocksLazily,
                                                              boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     // Generate 3 delta-log files w/ random records
@@ -895,7 +888,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("100")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -1158,7 +1150,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType 
diskMapType,
                                            boolean isCompressionEnabled,
-                                           boolean readBlocksLazily,
                                            boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1194,7 +1185,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     Set<String> originalKeys =
         copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
             .collect(Collectors.toSet());
-    checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, 
isCompressionEnabled, enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
         200, 200, Option.of(originalKeys));
   }
 
@@ -1202,7 +1193,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                            boolean 
isCompressionEnabled,
-                                                           boolean 
readBlocksLazily,
                                                            boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1258,7 +1248,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     Set<String> originalKeys =
         copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
             .collect(Collectors.toSet());
-    checkLogBlocksAndKeys("102", schema, readBlocksLazily, diskMapType, 
isCompressionEnabled, enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("102", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
         200, 200, Option.of(originalKeys));
   }
 
@@ -1327,7 +1317,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     Set<String> originalKeys =
         copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
             .collect(Collectors.toSet());
-    checkLogBlocksAndKeys("103", schema, true, diskMapType, 
isCompressionEnabled, enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("103", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
         200, 200, Option.of(originalKeys));
   }
 
@@ -1335,7 +1325,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                            boolean 
isCompressionEnabled,
-                                                           boolean 
readBlocksLazily,
                                                            boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1393,7 +1382,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("102")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -1441,7 +1429,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("103")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -1476,7 +1463,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                       boolean 
isCompressionEnabled,
-                                                                      boolean 
readBlocksLazily,
                                                                       boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1549,7 +1535,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("103")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -1582,8 +1567,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                        boolean 
isCompressionEnabled,
-                                                        boolean 
readBlocksLazily)
+                                                        boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1664,7 +1648,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("104")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -1703,7 +1686,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                          boolean 
isCompressionEnabled,
-                                                         boolean 
readBlocksLazily,
                                                          boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
 
@@ -1760,7 +1742,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, 
isCompressionEnabled, enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
         0, 0, Option.empty());
     FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
   }
@@ -1769,7 +1751,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                  boolean 
isCompressionEnabled,
-                                                                 boolean 
readBlocksLazily,
                                                                  boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
 
@@ -1810,7 +1791,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, 
isCompressionEnabled, enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
         0, 0, Option.empty());
     FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
   }
@@ -1819,7 +1800,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                          boolean 
isCompressionEnabled,
-                                                         boolean 
readBlocksLazily,
                                                          boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1847,7 +1827,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, 
isCompressionEnabled, enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
         100, 100, Option.empty());
   }
 
@@ -1855,7 +1835,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                   boolean 
isCompressionEnabled,
-                                                                  boolean 
readBlocksLazily,
                                                                   boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
 
@@ -1900,7 +1879,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("101", schema, readBlocksLazily, diskMapType, 
isCompressionEnabled, enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("101", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
         0, 0, Option.empty());
   }
 
@@ -1909,7 +1888,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testLogReaderWithDifferentVersionsOfDeleteBlocks(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                boolean 
isCompressionEnabled,
-                                                               boolean 
readBlocksLazily,
                                                                boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1990,7 +1968,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("103")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -2057,7 +2034,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     FileCreateUtils.createDeltaCommit(basePath, "101", fs);
 
     // Should be able to read all 110 records
-    checkLogBlocksAndKeys("101", schema, true, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
+    checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
         false, 110, 110, Option.empty());
 
     // Write a rollback for commit 100 which is not the latest commit
@@ -2068,7 +2045,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
 
     // Should only be able to read 10 records from commit 101
-    checkLogBlocksAndKeys("101", schema, true, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
+    checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
         false, 10, 10, Option.empty());
 
     // Write a rollback for commit 101 which is the latest commit
@@ -2080,7 +2057,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     // Should not read any records as both commits are rolled back
-    checkLogBlocksAndKeys("101", schema, true, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
+    checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
         false, 0, 0, Option.empty());
   }
 
@@ -2088,7 +2065,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                          
boolean isCompressionEnabled,
-                                                                         
boolean readBlocksLazily,
                                                                          
boolean enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
 
@@ -2171,7 +2147,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("101", schema, true, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
+    checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
         false, 0, 0, Option.empty());
     FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
   }
@@ -2179,8 +2155,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArgumentsWithoutOptimizedScanArg")
   public void 
testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                               
        boolean isCompressionEnabled,
-                                                                               
        boolean readBlocksLazily)
+                                                                               
        boolean isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write blocks in this manner.
@@ -2344,7 +2319,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime("108")
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
@@ -2384,7 +2358,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   private void testAvroLogRecordReaderMergingMultipleLogFiles(int 
numRecordsInLog1, int numRecordsInLog2,
                                                               
ExternalSpillableMap.DiskMapType diskMapType,
                                                               boolean 
isCompressionEnabled,
-                                                              boolean 
readBlocksLazily,
                                                               boolean 
enableOptimizedLogBlocksScan) {
     try {
       // Write one Data block with same InstantTime (written in same batch)
@@ -2433,7 +2406,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
           .withReaderSchema(schema)
           .withLatestInstantTime("100")
           .withMaxMemorySizeInBytes(10240L)
-          .withReadBlocksLazily(readBlocksLazily)
           .withReverseReader(false)
           .withBufferSize(BUFFER_SIZE)
           .withSpillableMapBasePath(spillableBasePath)
@@ -2454,47 +2426,43 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                        boolean 
isCompressionEnabled,
-                                                                       boolean 
readBlocksLazily,
                                                                        boolean 
enableOptimizedLogBlocksScan) {
     /*
      * FIRST_ATTEMPT_FAILED:
      * Original task from the stage attempt failed, but subsequent stage retry 
succeeded.
      */
     testAvroLogRecordReaderMergingMultipleLogFiles(77, 100,
-        diskMapType, isCompressionEnabled, readBlocksLazily, 
enableOptimizedLogBlocksScan);
+        diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                         
boolean isCompressionEnabled,
-                                                                        
boolean readBlocksLazily,
                                                                         
boolean enableOptimizedLogBlocksScan) {
     /*
      * SECOND_ATTEMPT_FAILED:
      * Original task from stage attempt succeeded, but subsequent retry 
attempt failed.
      */
     testAvroLogRecordReaderMergingMultipleLogFiles(100, 66,
-        diskMapType, isCompressionEnabled, readBlocksLazily, 
enableOptimizedLogBlocksScan);
+        diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                        boolean 
isCompressionEnabled,
-                                                                       boolean 
readBlocksLazily,
                                                                        boolean 
enableOptimizedLogBlocksScan) {
     /*
      * BOTH_ATTEMPTS_SUCCEEDED:
      * Original task from the stage attempt and duplicate task from the stage 
retry succeeded.
      */
     testAvroLogRecordReaderMergingMultipleLogFiles(100, 100,
-        diskMapType, isCompressionEnabled, readBlocksLazily, 
enableOptimizedLogBlocksScan);
+        diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testBasicAppendAndReadInReverse(boolean readBlocksLazily)
+  @Test
+  public void testBasicAppendAndReadInReverse()
       throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
         
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
@@ -2534,7 +2502,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     FileCreateUtils.createDeltaCommit(basePath, "100", fs);
 
     HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), 
fs.getFileStatus(writer.getLogFile().getPath()).getLen());
-    try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, 
SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {
+    try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, 
SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) {
 
       assertTrue(reader.hasPrev(), "Last block should be available");
       HoodieLogBlock prevBlock = reader.prev();
@@ -2568,9 +2536,8 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testAppendAndReadOnCorruptedLogInReverse(boolean 
readBlocksLazily)
+  @Test
+  public void testAppendAndReadOnCorruptedLogInReverse()
       throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
         
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
@@ -2615,8 +2582,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // First round of reads - we should be able to read the first block and 
then EOF
     HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), 
fs.getFileStatus(writer.getLogFile().getPath()).getLen());
 
-    try (HoodieLogFileReader reader =
-             new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, 
readBlocksLazily, true)) {
+    try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, 
schema, BUFFER_SIZE, true)) {
 
       assertTrue(reader.hasPrev(), "Last block should be available");
       HoodieLogBlock block = reader.prev();
@@ -2629,9 +2595,8 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     }
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily)
+  @Test
+  public void testBasicAppendAndTraverseInReverse()
       throws IOException, URISyntaxException, InterruptedException {
     Writer writer =
         
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
@@ -2668,7 +2633,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), 
fs.getFileStatus(writer.getLogFile().getPath()).getLen());
     try (HoodieLogFileReader reader =
-             new HoodieLogFileReader(fs, logFile, 
SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) {
+             new HoodieLogFileReader(fs, logFile, 
SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) {
 
       assertTrue(reader.hasPrev(), "Third block should be available");
       reader.moveToPrev();
@@ -2758,7 +2723,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     List<GenericRecord> projectedRecords = 
HoodieAvroUtils.rewriteRecords(records, projectedSchema);
 
-    try (Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
projectedSchema, true, false)) {
+    try (Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
projectedSchema, false)) {
       assertTrue(reader.hasNext(), "First block should be available");
 
       HoodieLogBlock nextBlock = reader.next();
@@ -2826,29 +2791,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   }
 
   private static Stream<Arguments> testArguments() {
-    // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, 
Arg3: readBlocksLazily, Arg4: enableOptimizedLogBlocksScan
-    return Stream.of(
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false, 
true),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false, 
true),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false, true),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false, 
true),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true, true),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true, 
true),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true, true),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true, true),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false, 
false),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false, 
false),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false, 
false),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false, 
false),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true, 
false),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true, 
false),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true, false),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true, false)
-    );
-  }
-
-  private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() {
-    // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, 
Arg3: readBlocksLazily
+    // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, 
Arg3: enableOptimizedLogBlocksScan
     return Stream.of(
         arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false),
         arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false),
@@ -2861,6 +2804,16 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     );
   }
 
+  private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() {
+    // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled
+    return Stream.of(
+        arguments(ExternalSpillableMap.DiskMapType.BITCASK, false),
+        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),
+        arguments(ExternalSpillableMap.DiskMapType.BITCASK, true),
+        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true)
+    );
+  }
+
   private static Set<HoodieLogFile> writeLogFiles(Path partitionPath,
                                                   Schema schema,
                                                   List<IndexedRecord> records,
@@ -2970,8 +2923,8 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     return reader;
   }
 
-  private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, 
boolean readBlocksLazily,
-                                     ExternalSpillableMap.DiskMapType 
diskMapType, boolean isCompressionEnabled, boolean 
enableOptimizedLogBlocksScan, int expectedTotalRecords,
+  private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, 
ExternalSpillableMap.DiskMapType diskMapType,
+                                     boolean isCompressionEnabled, boolean 
enableOptimizedLogBlocksScan, int expectedTotalRecords,
                                      int expectedTotalKeys, 
Option<Set<String>> expectedKeys) throws IOException {
     List<String> allLogFiles =
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
@@ -2984,7 +2937,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withReaderSchema(schema)
         .withLatestInstantTime(latestInstantTime)
         .withMaxMemorySizeInBytes(10240L)
-        .withReadBlocksLazily(readBlocksLazily)
         .withReverseReader(false)
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
diff --git 
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
 
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
index 7fc93c776f5..6790b602186 100644
--- 
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
+++ 
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -353,7 +353,6 @@ public class TestQuickstartData {
         .withLogFilePaths(logPaths)
         .withReaderSchema(readSchema)
         .withLatestInstantTime(instant)
-        .withReadBlocksLazily(false)
         .withReverseReader(false)
         .withBufferSize(16 * 1024 * 1024)
         .withMaxMemorySizeInBytes(1024 * 1024L)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index ecfc26a10dc..5970dc782b6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -282,7 +282,6 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
             .withReaderSchema(readerSchema)
             .withLatestInstantTime(instantTime)
             .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
-            
.withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
             
.withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
             .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
             .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index baa9f21216b..b10b5be9c47 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -159,7 +159,6 @@ public class FormatUtils {
         .withReaderSchema(logSchema)
         .withInternalSchema(internalSchema)
         .withLatestInstantTime(split.getLatestCommit())
-        .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
         .withReverseReader(false)
         .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
         .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
@@ -201,10 +200,6 @@ public class FormatUtils {
           .withReaderSchema(logSchema)
           .withInternalSchema(internalSchema)
           .withLatestInstantTime(split.getLatestCommit())
-          .withReadBlocksLazily(
-              string2Boolean(
-                  
flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-                      
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
           .withReverseReader(false)
           .withBufferSize(
               
flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
@@ -265,7 +260,6 @@ public class FormatUtils {
         .withLogFilePaths(logPaths)
         .withReaderSchema(logSchema)
         .withLatestInstantTime(latestInstantTime)
-        .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
         .withReverseReader(false)
         .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
         .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge())
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 65c8e82ada1..91e10a3fb9c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -949,7 +949,6 @@ public class TestData {
         .withLogFilePaths(logPaths)
         .withReaderSchema(readSchema)
         .withLatestInstantTime(instant)
-        .withReadBlocksLazily(false)
         .withReverseReader(false)
         .withBufferSize(16 * 1024 * 1024)
         .withMaxMemorySizeInBytes(1024 * 1024L)
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
index 4a39b6548f9..b7ec3b12403 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
@@ -48,8 +48,6 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP;
-import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED;
 import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE;
 import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH;
 import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
@@ -185,7 +183,6 @@ public class HoodieMergeOnReadSnapshotReader extends 
AbstractRealtimeRecordReade
         .withReaderSchema(readerSchema)
         .withLatestInstantTime(latestInstantTime)
         .withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf))
-        
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
         .withReverseReader(false)
         .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
         .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, 
DEFAULT_SPILLABLE_MAP_BASE_PATH))
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 61933608e94..5ef1c8d692d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -89,7 +89,6 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
         .withReaderSchema(getLogScannerReaderSchema())
         .withLatestInstantTime(split.getMaxCommitTime())
         
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
-        
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
         .withReverseReader(false)
         
.withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
         
.withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
 HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index dd0ef5bf15d..ed40f4dd47c 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -81,7 +81,6 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader
           .withLogFilePaths(split.getDeltaLogPaths())
           .withReaderSchema(getReaderSchema())
           .withLatestInstantTime(split.getMaxCommitTime())
-          
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
           .withReverseReader(false)
           
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 02d534d5b98..edd68ca7baa 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -287,7 +287,6 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
               .filterCompletedInstants().lastInstant().get().getTimestamp())
           .withMaxMemorySizeInBytes(
               HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-          .withReadBlocksLazily(true)
           .withReverseReader(false)
           
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
           
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 3a86a2cc738..b6a5ae7a956 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -385,10 +385,6 @@ object LogFileIterator extends SparkAdapterSupport {
         // NOTE: This part shall only be reached when at least one log is 
present in the file-group
         //       entailing that table has to have at least one commit
         .withLatestInstantTime(tableState.latestCommitTimestamp.get)
-        .withReadBlocksLazily(
-          
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-            
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-            .getOrElse(false))
         .withReverseReader(false)
         .withInternalSchema(internalSchema)
         .withBufferSize(
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index cca1fd1da0d..fa220acf7b2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -71,7 +71,6 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
         .withLogFilePaths(logFilePaths.asJava)
         .withReaderSchema(schema)
         
.withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp)
-        
.withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue))
         
.withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue))
         
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue)
         
.withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 8f19ac76287..fa34e24cc5e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -99,13 +101,6 @@ import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
-import static 
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
-import static 
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
-import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
-import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
-import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static 
org.apache.hudi.hadoop.fs.CachingPath.getPathWithoutSchemeAndAuthority;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
@@ -498,11 +493,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           instant = new HoodieInstant(HoodieInstant.State.REQUESTED, 
instant.getAction(), instant.getTimestamp());
           HoodieCleanerPlan cleanerPlan = 
CleanerUtils.getCleanerPlan(metaClient, instant);
 
-          return 
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList
 -> {
-            return cleanerFileInfoList.stream().map(fileInfo -> {
-              return new Path(fileInfo.getFilePath()).getName();
-            });
-          });
+          return 
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList
 ->
+            cleanerFileInfoList.stream().map(fileInfo -> new 
Path(fileInfo.getFilePath()).getName())
+          );
 
         } catch (IOException e) {
           throw new HoodieIOException("Error reading cleaner metadata for " + 
instant);
@@ -533,7 +526,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           engineContext.parallelize(allPartitions, 
allPartitions.size()).map(partitionPath -> {
             try {
               validateFilesInPartition(metadataTableBasedContext, 
fsBasedContext, partitionPath, finalBaseFilesForCleaning);
-              LOG.info(String.format("Metadata table validation succeeded for 
partition %s (partition %s)", partitionPath, taskLabels));
+              LOG.info("Metadata table validation succeeded for partition {} 
(partition {})", partitionPath, taskLabels);
               return Pair.<Boolean, Exception>of(true, null);
             } catch (HoodieValidationException e) {
               LOG.error(
@@ -569,10 +562,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       }
 
       if (finalResult) {
-        LOG.info(String.format("Metadata table validation succeeded (%s).", 
taskLabels));
+        LOG.info("Metadata table validation succeeded ({}).", taskLabels);
         return true;
       } else {
-        LOG.warn(String.format("Metadata table validation failed (%s).", 
taskLabels));
+        LOG.warn("Metadata table validation failed ({}).", taskLabels);
         return false;
       }
     } catch (Exception e) {
@@ -644,9 +637,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           if (partitionCreationTimeOpt.isPresent() && 
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
             Option<HoodieInstant> lastInstant = 
completedTimeline.lastInstant();
             if (lastInstant.isPresent()
-                && 
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN, 
lastInstant.get().getTimestamp())) {
-              LOG.warn("Ignoring additional partition " + partitionFromDMT + 
", as it was deduced to be part of a "
-                  + "latest completed commit which was inflight when FS based 
listing was polled.");
+                && 
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), 
HoodieTimeline.GREATER_THAN, lastInstant.get().getTimestamp())) {
+              LOG.warn("Ignoring additional partition {}, as it was deduced to 
be part of a "
+                  + "latest completed commit which was inflight when FS based 
listing was polled.", partitionFromDMT);
               actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
             }
           }
@@ -702,7 +695,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
           return lastInstant.isPresent()
               && HoodieTimeline.compareTimestamps(
-              instantTime, LESSER_THAN_OR_EQUALS, 
lastInstant.get().getTimestamp());
+              instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, 
lastInstant.get().getTimestamp());
         }
         return true;
       } else {
@@ -782,8 +775,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           .collect(Collectors.toList());
     }
 
-    LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". 
For partitions " + partitionPath);
-    LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + 
". For partitions " + partitionPath);
+    LOG.debug("All file slices from metadata: {}. For partitions {}", 
allFileSlicesFromMeta, partitionPath);
+    LOG.debug("All file slices from direct listing: {}. For partitions {}", 
allFileSlicesFromFS, partitionPath);
     validateFileSlices(
         allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath,
         fsBasedContext.getMetaClient(), "all file groups");
@@ -809,8 +802,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       latestFilesFromFS = 
fsBasedContext.getSortedLatestBaseFileList(partitionPath);
     }
 
-    LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + 
". For partitions " + partitionPath);
-    LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + 
". For partitions " + partitionPath);
+    LOG.debug("Latest base file from metadata: {}. For partitions {}", 
latestFilesFromMetadata, partitionPath);
+    LOG.debug("Latest base file from direct listing: {}. For partitions {}", 
latestFilesFromFS, partitionPath);
 
     validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, 
"latest base files");
   }
@@ -834,8 +827,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       latestFileSlicesFromFS = 
fsBasedContext.getSortedLatestFileSliceList(partitionPath);
     }
 
-    LOG.debug("Latest file list from metadata: " + 
latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
-    LOG.debug("Latest file list from direct listing: " + 
latestFileSlicesFromFS + ". For partition " + partitionPath);
+    LOG.debug("Latest file list from metadata: {}. For partition {}", 
latestFileSlicesFromMetadataTable, partitionPath);
+    LOG.debug("Latest file list from direct listing: {}. For partition {}", 
latestFileSlicesFromFS, partitionPath);
 
     validateFileSlices(
         latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, 
partitionPath,
@@ -906,7 +899,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     String basePath = metaClient.getBasePathV2().toString();
     long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
         .load(basePath)
-        .select(RECORD_KEY_METADATA_FIELD)
+        .select(HoodieRecord.RECORD_KEY_METADATA_FIELD)
         .count();
     long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
         .load(getMetadataTableBasePath(basePath))
@@ -915,14 +908,12 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         .count();
 
     if (countKeyFromTable != countKeyFromRecordIndex) {
-      String message = String.format("Validation of record index count failed: 
"
-              + "%s entries from record index metadata, %s keys from the data 
table: " + cfg.basePath,
-          countKeyFromRecordIndex, countKeyFromTable);
+      String message = String.format("Validation of record index count failed: 
%s entries from record index metadata, %s keys from the data table: %s",
+          countKeyFromRecordIndex, countKeyFromTable, cfg.basePath);
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info(String.format(
-          "Validation of record index count succeeded: %s entries. Table: %s", 
countKeyFromRecordIndex, cfg.basePath));
+      LOG.info("Validation of record index count succeeded: {} entries. Table: 
{}", countKeyFromRecordIndex, cfg.basePath);
     }
   }
 
@@ -932,11 +923,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     String basePath = metaClient.getBasePathV2().toString();
     JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd =
         sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
-            .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, 
FILENAME_METADATA_FIELD)
+            .select(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
HoodieRecord.FILENAME_METADATA_FIELD)
             .toJavaRDD()
-            .mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
-                
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
-                    
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))))))
+            .mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)),
+                
Pair.of(row.getString(row.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)),
+                    
FSUtils.getFileId(row.getString(row.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD))))))
             .cache();
 
     JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd =
@@ -970,7 +961,6 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         .map(e -> {
           Optional<Pair<String, String>> locationOnFs = e._2._1;
           Optional<Pair<String, String>> locationFromRecordIndex = e._2._2;
-          StringBuilder sb = new StringBuilder();
           List<String> errorSampleList = new ArrayList<>();
           if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) 
{
             if 
(locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft())
@@ -1036,8 +1026,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info(String.format(
-          "Validation of record index content succeeded: %s entries. Table: 
%s", countKey, cfg.basePath));
+      LOG.info("Validation of record index content succeeded: {} entries. 
Table: {}", countKey, cfg.basePath);
     }
   }
 
@@ -1082,7 +1071,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info(String.format("Validation of %s succeeded for partition %s for 
table: %s", label, partitionPath, cfg.basePath));
+      LOG.info("Validation of {} succeeded for partition {} for table: {}", 
label, partitionPath, cfg.basePath);
     }
   }
 
@@ -1109,8 +1098,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           mismatch = true;
           break;
         } else {
-          LOG.warn(String.format("There are uncommitted log files in the 
latest file slices "
-              + "but the committed log files match: %s %s", fileSlice1, 
fileSlice2));
+          LOG.warn("There are uncommitted log files in the latest file slices 
but the committed log files match: {} {}", fileSlice1, fileSlice2);
         }
       }
     }
@@ -1122,7 +1110,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       LOG.error(message);
       throw new HoodieValidationException(message);
     } else {
-      LOG.info(String.format("Validation of %s succeeded for partition %s for 
table: %s ", label, partitionPath, cfg.basePath));
+      LOG.info("Validation of {} succeeded for partition {} for table: {}", 
label, partitionPath, cfg.basePath);
     }
   }
 
@@ -1154,13 +1142,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     FileSystem fileSystem = metaClient.getFs();
 
     if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, metaClient, 
committedFilesMap)) {
-      LOG.error("The first file slice has committed log files that cause 
mismatching: " + fs1
-          + "; Different log files are: " + fs1LogPathSet);
+      LOG.error("The first file slice has committed log files that cause 
mismatching: {}; Different log files are: {}", fs1, fs1LogPathSet);
       return false;
     }
     if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, metaClient, 
committedFilesMap)) {
-      LOG.error("The second file slice has committed log files that cause 
mismatching: " + fs2
-          + "; Different log files are: " + fs2LogPathSet);
+      LOG.error("The second file slice has committed log files that cause 
mismatching: {}; Different log files are: {}", fs2, fs2LogPathSet);
       return false;
     }
     return true;
@@ -1187,17 +1173,16 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         MessageType messageType =
             TableSchemaResolver.readSchemaFromLogFile(fs, new 
Path(logFilePathStr));
         if (messageType == null) {
-          LOG.warn(String.format("Cannot read schema from log file %s. "
-              + "Skip the check as it's likely being written by an inflight 
instant.", logFilePathStr));
+          LOG.warn("Cannot read schema from log file {}. Skip the check as 
it's likely being written by an inflight instant.", logFilePathStr);
           continue;
         }
         Schema readerSchema = converter.convert(messageType);
         reader =
-            HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), 
readerSchema, true, false);
+            HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), 
readerSchema, false);
         // read the avro blocks
         if (reader.hasNext()) {
           HoodieLogBlock block = reader.next();
-          final String instantTime = 
block.getLogBlockHeader().get(INSTANT_TIME);
+          final String instantTime = 
block.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME);
           if (completedInstantsTimeline.containsInstant(instantTime)) {
             // The instant is completed, in active timeline
             // Checking commit metadata only as log files can only be written 
by COMMIT or DELTA_COMMIT
@@ -1225,36 +1210,30 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             // behavior.
             String relativeLogFilePathStr = getRelativePath(basePath, 
logFilePathStr);
             if 
(committedFilesMap.get(instantTime).contains(relativeLogFilePathStr)) {
-              LOG.warn("Log file is committed in an instant in active 
timeline: instantTime="
-                  + instantTime + " " + logFilePathStr);
+              LOG.warn("Log file is committed in an instant in active 
timeline: instantTime={} {}", instantTime, logFilePathStr);
               return true;
             } else {
-              LOG.warn("Log file is uncommitted in a completed instant, likely 
due to retry: "
-                  + "instantTime=" + instantTime + " " + logFilePathStr);
+              LOG.warn("Log file is uncommitted in a completed instant, likely 
due to retry: instantTime={} {}", instantTime, logFilePathStr);
             }
           } else if 
(completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) {
             // The instant is in archived timeline
-            LOG.warn("Log file is committed in an instant in archived 
timeline: instantTime="
-                + instantTime + " " + logFilePathStr);
+            LOG.warn("Log file is committed in an instant in archived 
timeline: instantTime={} {}", instantTime, logFilePathStr);
             return true;
           } else if (inflightInstantsTimeline.containsInstant(instantTime)) {
             // The instant is inflight in active timeline
             // hit an uncommitted block possibly from a failed write
-            LOG.warn("Log file is uncommitted because of an inflight instant: 
instantTime="
-                + instantTime + " " + logFilePathStr);
+            LOG.warn("Log file is uncommitted because of an inflight instant: 
instantTime={} {}", instantTime, logFilePathStr);
           } else {
             // The instant is after the start of the active timeline,
             // but it cannot be found in the active timeline
-            LOG.warn("Log file is uncommitted because the instant is after the 
start of the "
-                + "active timeline but absent or in requested in the active 
timeline: instantTime="
-                + instantTime + " " + logFilePathStr);
+            LOG.warn("Log file is uncommitted because the instant is after the 
start of the active timeline but absent or in requested in the active timeline: 
instantTime={} {}",
+                instantTime, logFilePathStr);
           }
         } else {
-          LOG.warn("There is no log block in " + logFilePathStr);
+          LOG.warn("There is no log block in {}", logFilePathStr);
         }
       } catch (IOException e) {
-        LOG.warn(String.format("Cannot read log file %s: %s. "
-                + "Skip the check as it's likely being written by an inflight 
instant.",
+        LOG.warn(String.format("Cannot read log file %s: %s. Skip the check as 
it's likely being written by an inflight instant.",
             logFilePathStr, e.getMessage()), e);
       } finally {
         FileIOUtils.closeQuietly(reader);
@@ -1289,8 +1268,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
             long toSleepMs = cfg.minValidateIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
 
             if (toSleepMs > 0) {
-              LOG.info("Last validate ran less than min validate interval: " + 
cfg.minValidateIntervalSeconds + " s, sleep: "
-                  + toSleepMs + " ms.");
+              LOG.info("Last validate ran less than min validate interval: {} 
s, sleep: {} ms.", cfg.minValidateIntervalSeconds, toSleepMs);
               Thread.sleep(toSleepMs);
             }
           } catch (HoodieValidationException e) {
@@ -1376,7 +1354,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           .build();
       this.fileSystemView = 
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
           metaClient, metadataConfig);
-      this.tableMetadata = HoodieTableMetadata.create(engineContext, 
metadataConfig, metaClient.getBasePath());
+      this.tableMetadata = HoodieTableMetadata.create(engineContext, 
metadataConfig, metaClient.getBasePathV2().toString());
       if 
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) 
{
         this.allColumnNameList = getAllColumnNames();
       }
@@ -1408,7 +1386,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     @SuppressWarnings({"rawtypes", "unchecked"})
     public List<HoodieColumnRangeMetadata<Comparable>> 
getSortedColumnStatsList(
         String partitionPath, List<String> baseFileNameList) {
-      LOG.info("All column names for getting column stats: " + 
allColumnNameList);
+      LOG.info("All column names for getting column stats: {}", 
allColumnNameList);
       if (enableMetadataTable) {
         List<Pair<String, String>> partitionFileNameList = 
baseFileNameList.stream()
             .map(filename -> Pair.of(partitionPath, 
filename)).collect(Collectors.toList());
@@ -1424,7 +1402,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         return baseFileNameList.stream().flatMap(filename ->
                 new ParquetUtils().readRangeFromParquetMetadata(
                     metaClient.getHadoopConf(),
-                    new 
Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath), 
filename),
+                    new 
Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), 
filename),
                     allColumnNameList).stream())
             .sorted(new HoodieColumnRangeMetadataComparator())
             .collect(Collectors.toList());
@@ -1460,7 +1438,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         return schemaResolver.getTableAvroSchema().getFields().stream()
             .map(Schema.Field::name).collect(Collectors.toList());
       } catch (Exception e) {
-        throw new HoodieException("Failed to get all column names for " + 
metaClient.getBasePath());
+        throw new HoodieException("Failed to get all column names for " + 
metaClient.getBasePathV2());
       }
     }
 
@@ -1468,17 +1446,17 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       Path path = new 
Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), 
filename);
       BloomFilter bloomFilter;
       HoodieConfig hoodieConfig = new HoodieConfig();
-      hoodieConfig.setValue(USE_NATIVE_HFILE_READER,
-          Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, 
USE_NATIVE_HFILE_READER)));
+      hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER,
+          Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER)));
       try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(hoodieConfig, metaClient.getHadoopConf(), path)) {
         bloomFilter = fileReader.readBloomFilter();
         if (bloomFilter == null) {
-          LOG.error("Failed to read bloom filter for " + path);
+          LOG.error("Failed to read bloom filter for {}", path);
           return Option.empty();
         }
       } catch (IOException e) {
-        LOG.error("Failed to get file reader for " + path + " " + 
e.getMessage());
+        LOG.error("Failed to get file reader for {} {}", path, e.getMessage());
         return Option.empty();
       }
       return Option.of(BloomFilterData.builder()

Reply via email to